static void Main(string[] args) { Thread t = new Thread(PrintNumbers); t.Start();//線程開始執行 PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } }
class Program { static void Main(string[] args) { Thread t = new Thread(PrintNumbersWithDelay); t.Start(); PrintNumbers(); Console.ReadKey(); } static void PrintNumbers() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Console.WriteLine(i); } } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2));//暫停2S Console.WriteLine(i); } } }
當程序運行時,會建立一個線程,該線程會執行PrintNumbersWithDelay方法中的代碼。而後會當即執行PrintNumbers方法。關鍵之處在於在PrintNumbersWithDelay方法中加入了Thread.Sleep方法調用。這將致使線程執行該代碼時,在打印任何數字以前會等待指定的時間(本例中是2秒鐘),當線程處於休眠狀態時,它會佔用盡量少的CPU時間。結果咱們4·會發現一般後運行的PrintNumbers方法中的代碼會比獨立線程中的PrintNumbersWithDelay方法中的代碼先執行。html
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); t.Join(); Console.WriteLine("Thread completed"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
當程序運行時,啓動了一個耗時較長的線程來打印數字,打印每一個數字前要等待兩秒。但咱們在主程序中調用了t.Join方法,該方法容許咱們等待直到線程t完成。當線程t完成 "時,主程序會繼續運行。藉助該技術能夠實如今兩個線程間同步執行步驟。第一個線程會等待另外一個線程完成後再繼續執行。第一個線程等待時是處於阻塞狀態(正如暫停線程中調用 Thread.Sleep方法同樣),程序員
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithDelay); t.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); } static void PrintNumbersWithDelay() { Console.WriteLine("Starting..."); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
當主程序和單獨的數字打印線程運行時,咱們等待6秒後對線程調用了t.Abort方法。這給線程注入了ThreadAbortException方法,致使線程被終結。這很是危險,由於該異常能夠在任什麼時候刻發生並可能完全摧毀應用程序。另外,使用該技術也不必定總能終止線程。目-標線程能夠經過處理該異常並調用Thread.ResetAbort方法來拒絕被終止。所以並不推薦使用,Abort方法來關閉線程。可優先使用一些其餘方法,好比提供一個CancellationToken方法來,取消線程的執行。web
class Program { static void Main(string[] args) { Console.WriteLine("Starting program..."); Thread t = new Thread(PrintNumbersWithStatus); Thread t2 = new Thread(DoNothing); Console.WriteLine(t.ThreadState.ToString()); t2.Start(); t.Start(); for (int i = 1; i < 30; i++) { Console.WriteLine(t.ThreadState.ToString()); } Thread.Sleep(TimeSpan.FromSeconds(6)); t.Abort(); Console.WriteLine("A thread has been aborted"); Console.WriteLine(t.ThreadState.ToString()); Console.WriteLine(t2.ThreadState.ToString()); Console.ReadKey(); } static void DoNothing() { Thread.Sleep(TimeSpan.FromSeconds(2)); } static void PrintNumbersWithStatus() { Console.WriteLine("Starting..."); Console.WriteLine(Thread.CurrentThread.ThreadState.ToString()); for (int i = 1; i < 10; i++) { Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(i); } } }
當主程序啓動時定義了兩個不一樣的線程。一個將被終止,另外一個則會成功完成運行。線,.程狀態位於Thread對象的ThreadState屬性中。ThreadState屬性是一個C#枚舉對象。剛開始線程狀態爲ThreadState.Unstarted,而後咱們啓動線程,並估計在一個週期爲30次迭代的,區間中,線程狀態會從ThreadState.Running變爲ThreadState. WaitSleepJoin。算法
請注意始終能夠經過Thread.CurrentThread靜態屬性得到當前Thread對象。
若是實際狀況與以上不符,請增長迭代次數。終止第一個線程後,會看到如今該線程狀態爲ThreadState.Aborted,程序也有可能會打印出ThreadState.AbortRequested狀態。這充分說明了同步兩個線程的複雜性。請記住不要在程序中使用線程終止。我在這裏使用它只是爲 ,了展現相應的線程狀態。數據庫
最後能夠看到第二個線程t2成功完成而且狀態爲ThreadState.Stopped。另外還有一些其,他的線程狀態,可是要麼已經被棄用,要麼沒有咱們實驗過的幾種狀態有用。編程
class Program { static void Main(string[] args) { Console.WriteLine("Current thread priority: {0}", Thread.CurrentThread.Priority); Console.WriteLine("Running on all cores available"); RunThreads(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Running on a single core"); Process.GetCurrentProcess().ProcessorAffinity = new IntPtr(1); RunThreads(); } static void RunThreads() { var sample = new ThreadSample(); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; var threadTwo = new Thread(sample.CountNumbers); threadTwo.Name = "ThreadTwo"; threadOne.Priority = ThreadPriority.Highest; threadTwo.Priority = ThreadPriority.Lowest; threadOne.Start(); threadTwo.Start(); Thread.Sleep(TimeSpan.FromSeconds(2)); sample.Stop(); Console.ReadKey(); } class ThreadSample { private bool _isStopped = false; public void Stop() { _isStopped = true; } public void CountNumbers() { long counter = 0; while (!_isStopped) { counter++; } Console.WriteLine("{0} with {1,11} priority " + "has a count = {2,13}", Thread.CurrentThread.Name, Thread.CurrentThread.Priority, counter.ToString("N0")); } } }
當主程序啓動時定義了兩個不一樣的線程。第一個線程優先級爲ThreadPriority.Highest,即具備最高優先級。第二個線程優先級爲ThreadPriority.Lowest,即具備最低優先級。咱們先, ,打印出主線程的優先級值,而後在全部可用的CPU核心上啓動這兩個線程。若是擁有一個1以上的計算核心,將在兩秒鐘內獲得初步結果。最高優先級的線程一般會計算更多的迭代.可是兩個值應該很接近。然而,若是有其餘程序佔用了全部的CPU核心運行負載,結果則會大相徑庭。數組
爲了模擬該情形,咱們設置了ProcessorAffinity選項,讓操做系統將全部的線程運,行在單個CPU核心(第一個核心)上。如今結果徹底不一樣,而且計算耗時將超過2秒鐘。 .這是由於CPU核心大部分時間在運行高優先級的線程,只留給剩下的線程不多的時間來,運行。瀏覽器
請注意這是操做系統使用線程優先級的一個演示。一般你無需使用這種行爲編寫程序。安全
class Program { static void Main(string[] args) { var sampleForeground = new ThreadSample(10); var sampleBackground = new ThreadSample(20); var threadOne = new Thread(sampleForeground.CountNumbers); threadOne.Name = "ForegroundThread"; var threadTwo = new Thread(sampleBackground.CountNumbers); threadTwo.Name = "BackgroundThread"; threadTwo.IsBackground = true; threadOne.Start(); threadTwo.Start(); Console.ReadKey(); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 0; i < _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
當主程序啓動時定義了兩個不一樣的線程。默認狀況下,顯式建立的線程是前臺線程。經過手動的設置threadTwo對象的IsBackground屬性爲ture來建立一個後臺線程。經過配置來實現第一個線程會比第二個線程先完成。而後運行程序。服務器
第一個線程完成後,程序結束而且後臺線程被終結。這是前臺線程與後臺線程的主要區,別:進程會等待全部的前臺線程完成後再結束工做,可是若是隻剩下後臺線程,則會直接結束工做。
一個重要注意事項是若是程序定義了一個不會完成的前臺線程,主程序並不會正常結束。
class Program { static void Main(string[] args) { var sample = new ThreadSample(10); var threadOne = new Thread(sample.CountNumbers); threadOne.Name = "ThreadOne"; threadOne.Start(); threadOne.Join(); Console.WriteLine("--------------------------"); var threadTwo = new Thread(Count); threadTwo.Name = "ThreadTwo"; threadTwo.Start(8); threadTwo.Join(); Console.WriteLine("--------------------------"); var threadThree = new Thread(() => CountNumbers(12)); threadThree.Name = "ThreadThree"; threadThree.Start(); threadThree.Join(); Console.WriteLine("--------------------------"); int i = 10; var threadFour = new Thread(() => PrintNumber(i)); i = 20; var threadFive = new Thread(() => PrintNumber(i)); threadFour.Start(); threadFive.Start(); } static void Count(object iterations) { CountNumbers((int)iterations); } static void CountNumbers(int iterations) { for (int i = 1; i <= iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } static void PrintNumber(int number) { Console.WriteLine(number); } class ThreadSample { private readonly int _iterations; public ThreadSample(int iterations) { _iterations = iterations; } public void CountNumbers() { for (int i = 1; i <= _iterations; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine("{0} prints {1}", Thread.CurrentThread.Name, i); } } } }
當主程序啓動時,首先建立了ThreadSample類的一個對象,並提供了一個迭代次數。而後使用該對象的CountNumbers方法啓動線程。該方法運行在另外一個線程中,可是使用數 ,字10,該數字是經過ThreadSample對象的構造函數傳入的。所以,咱們只是使用相同的間接方式將該迭代次數傳遞給另外一個線程。
另外一種傳遞數據的方式是使用Thread.Start方法。該方法會接收一個對象,並將該對象,傳遞給線程。爲了應用該方法,在線程中啓動的方法必須接受object類型的單個參數。在建立threadTwo線程時演示了該方式。咱們將8做爲一個對象傳遞給了Count方法,而後 Count方法被轉換爲整型。
接下來的方式是使用lambda表達式。lambda表達式定義了一個不屬於任何類的方法。咱們建立了一個方法,該方法使用須要的參數調用了另外一個方法,並在另外一個線程中運行該 ,方法。當啓動threadThree線程時,打印出了12個數字,這正是咱們經過lambda表達式傳遞,的數字。
使用lambda表達式引用另外一個C#對象的方式被稱爲閉包。當在lambda表達式中使用任何局部變量時, C#會生成一個類,並將該變量做爲該類的一個屬性。因此實際上該方式與 threadOne線程中使用的同樣,可是咱們無須定義該類, C#編譯器會自動幫咱們實現。
這可能會致使幾個問題。例如,若是在多個lambda表達式中使用相同的變量,它們會共享該變量值。在前一個例子中演示了這種狀況。當啓動threadFour和threadFive線程時,.它們都會打印20,由於在這兩個線程啓動以前變量被修改成20。
class Program { static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}",c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterWithLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { public int Count { get; private set; } public override void Increment() { Count++; } public override void Decrement() { Count--; } } class CounterWithLock : CounterBase { private readonly object _syncRoot = new Object(); public int Count { get; private set; } public override void Increment() { lock (_syncRoot) { Count++; } } public override void Decrement() { lock (_syncRoot) { Count--; } } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
當主程序啓動時,建立了一個Counter類的對象。該類定義了一個能夠遞增和遞減的簡,單的計數器。而後咱們啓動了三個線程。這三個線程共享同一個counter實例,在一個週期中進行一次遞增和一次遞減。這將致使不肯定的結果。若是運行程序屢次,則會打印出多個不一樣的計數器值。結果多是0,但大多數狀況下則不是0.
這是由於Counter類並非線程安全的。當多個線程同時訪問counter對象時,第一個線程獲得的counter值10並增長爲11,而後第二個線程獲得的值是11並增長爲12,第一個線程獲得counter值12,可是遞減操做發生前,第二個線程獲得的counter值也是12,而後 , 第一個線程將12遞減爲11並保存回counter中,同時第二個線程進行了一樣的操做。結果,咱們進行了兩次遞增操做可是隻有一次遞減操做,這顯然不對。這種情形被稱爲競爭條件, (race condition),競爭條件是多線程環境中很是常見的致使錯誤的緣由。
爲了確保不會發生以上情形,必須保證當有線程操做counter對象時,全部其餘線程必須等待直到當前線程完成操做。咱們能夠使用lock關鍵字來實現這種行爲。若是鎖定了一個對象,須要訪問該對象的全部其餘線程則會處於阻塞狀態,並等待直到該對象解除鎖定。這,可能會致使嚴重的性能問題,在第2章中將會進一步學習該知識點。
class Program { static void Main(string[] args) { object lock1 = new object(); object lock2 = new object(); new Thread(() => LockTooMuch(lock1, lock2)).Start(); lock (lock2) { Thread.Sleep(1000); Console.WriteLine("Monitor.TryEnter allows not to get stuck, returning false after a specified timeout is elapsed"); if (Monitor.TryEnter(lock1, TimeSpan.FromSeconds(5))) { Console.WriteLine("Acquired a protected resource succesfully"); } else { Console.WriteLine("Timeout acquiring a resource!"); } } new Thread(() => LockTooMuch(lock1, lock2)).Start(); Console.WriteLine("----------------------------------"); lock (lock2) { Console.WriteLine("This will be a deadlock!"); Thread.Sleep(1000); lock (lock1) { Console.WriteLine("Acquired a protected resource succesfully"); } } Console.ReadKey(); } static void LockTooMuch(object lock1, object lock2) { lock (lock1) { Thread.Sleep(1000); lock (lock2); } } }
先看看LockTooMuch方法。在該方法中咱們先鎖定了第一個對象,等待一秒後鎖定了 ,第二個對象。而後在另外一個線程中啓動該方法。最後嘗試在主線程中前後鎖定第二個和第一個對象。
若是像該示例的第二部分同樣使用lock關鍵字,將會形成死鎖。第一個線程保持對, lock1對象的鎖定,等待直到lock2對象被釋放。主線程保持對lock2對象的鎖定並等待直到。lock1對象被釋放,但lock1對象永遠不會被釋放。
實際上lock關鍵字是Monitor類用例的一個語法糖。若是咱們分解使用了lock關鍵字的代碼,將會看到它以下面代碼片斷所示:
bool acquiredLock = false; try { Monitor.Enter(lockObject, ref acquiredLock); } finally { if (acquiredLock) { Monitor.Exit(lockObject); } }
所以,咱們能夠直接使用Monitor類。其擁有TryEnter方法,該方法接受一個超時, "參數。若是在咱們可以獲取被lock保護的資源以前,超時參數過時,則該方法會返回 false.
class Program { static void Main(string[] args) { var t = new Thread(FaultyThread); t.Start(); t.Join(); try { t = new Thread(BadFaultyThread); t.Start(); } catch (Exception ex) { Console.WriteLine("We won't get here!"); } } static void BadFaultyThread() { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(2)); throw new Exception("Boom!"); } static void FaultyThread() { try { Console.WriteLine("Starting a faulty thread..."); Thread.Sleep(TimeSpan.FromSeconds(1)); throw new Exception("Boom!"); } catch (Exception ex) { Console.WriteLine("Exception handled: {0}", ex.Message); } } }
當主程序啓動時,定義了兩個將會拋出異常的線程。其中一個對異常進行了處理,另外一個則沒有。能夠看到第二個異常沒有被包裹啓動線程的try/catch代碼塊捕獲到。因此若是直接使用線程,通常來講不要在線程中拋出異常,而是在線程代碼中使用try/catch代碼塊。
在較老版本的.NET Framework中(1.0和1.1),該行爲是不同的,未被捕獲的異常不會強制應用程序關閉。能夠經過添加一個包含如下代碼片斷的應用程序配置文件(好比app config)來使用該策略。
<configuration> <runtime> <legacyUnhandledExceptionPolicy enable="1" /> </runtime> </configuration>
正如前面所看到的同樣,多個線程同時使用共享對象會形成不少問題。同步這些線程使得對共享對象的操做可以以正確的順序執行是很是重要的。在使用C#中的lock關鍵字,咱們遇到了一個叫做競爭條件的問題。致使這問題的緣由是多線程的執行並無正確同步。當一個線程執行遞增和遞減操做時,其餘線程須要依次等待。這種常見問題一般被稱爲線程同步。
有多種方式來實現線程同步。首先,若是無須共享對象,那麼就無須進行線程同步。令,人驚奇的是大多數時候能夠經過從新設計程序來除移共享狀態,從而去掉複雜的同步構造。請儘量避免在多個線程間使用單一對象。
若是必須使用共享的狀態,第二種方式是隻使用原子操做。這意味着一個操做只佔用一個量子的時間,一次就能夠完成。因此只有當前操做完成後,其餘線程才能執行其餘操做。所以,你無須實現其餘線程等待當前操做完成,這就避免了使用鎖,也排除了死鎖的狀況。
若是上面的方式不可行,而且程序的邏輯更加複雜,那麼咱們不得不使用不一樣的方式來,協調線程。方式之一是將等待的線程置於阻塞狀態。當線程處於阻塞狀態時,只會佔用儘量少的CPU時間。然而,這意味着將引入至少一次所謂的上下文切換( context switch),上下文切換是指操做系統的線程調度器。該調度器會保存等待的線程的狀態,並切換到另外一個.線程,依次恢復等待的線程的狀態。這須要消耗至關多的資源。然而,若是線程要被掛起很,長時間,那麼這樣作是值得的。這種方式又被稱爲內核模式(kernel-mode),由於只有操做系,統的內核才能阻止線程使用CPU時間。
萬一線程只須要等待一小段時間,最好只是簡單的等待,而不用將線程切換到阻塞狀,態。雖然線程等待時會浪費CPU時間,但咱們節省了上下文切換耗費的CPU時間。該方式又被稱爲用戶模式(user-mode),該方式很是輕量,速度很快,但若是線程須要等待較長時間則會浪費大量的CPU時間。
爲了利用好這兩種方式,能夠使用混合模式(hybrid),混合模式先嚐試使用用戶模式等,待,若是線程等待了足夠長的時間,則會切換到阻塞狀態以節省CPU資源。
本節將展現如何對對象執行基本的原子操做,從而不用阻塞線程就可避免競爭條件。
internal class Program { private static void Main(string[] args) { Console.WriteLine("Incorrect counter"); var c = new Counter(); var t1 = new Thread(() => TestCounter(c)); var t2 = new Thread(() => TestCounter(c)); var t3 = new Thread(() => TestCounter(c)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c.Count); Console.WriteLine("--------------------------"); Console.WriteLine("Correct counter"); var c1 = new CounterNoLock(); t1 = new Thread(() => TestCounter(c1)); t2 = new Thread(() => TestCounter(c1)); t3 = new Thread(() => TestCounter(c1)); t1.Start(); t2.Start(); t3.Start(); t1.Join(); t2.Join(); t3.Join(); Console.WriteLine("Total count: {0}", c1.Count); Console.ReadKey(); } static void TestCounter(CounterBase c) { for (int i = 0; i < 100000; i++) { c.Increment(); c.Decrement(); } } class Counter : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { _count++; } public override void Decrement() { _count--; } } class CounterNoLock : CounterBase { private int _count; public int Count { get { return _count; } } public override void Increment() { Interlocked.Increment(ref _count); } public override void Decrement() { Interlocked.Decrement(ref _count); } } abstract class CounterBase { public abstract void Increment(); public abstract void Decrement(); } }
當程序運行時,會建立三個線程來運行TestCounter方法中的代碼。該方法對一個對象,按序執行了遞增或遞減操做。起初的Counter對象不是線程安全的,咱們會遇到競爭條件。因此第一個例子中計數器的結果值是不肯定的。咱們可能會獲得數字0,然而若是運行程序屢次,你將最終獲得一些不正確的非零結果。在第1部分中,咱們經過鎖定對象解決了這個問題。在一個線程獲取舊的計數器值並計,算後賦予新的值以前,其餘線程都被阻塞了。然而,若是咱們採用上述方式執行該操做中途不能中止。而藉助於Interlocked類,咱們無需鎖定任何對象便可獲取到正確的結果。Interlocked提供了Increment, Decrement和Add等基本數學操做的原子方法,從而幫助咱們,在編寫Counter類時無需使用鎖。
本節將描述如何使用Mutex類來同步兩個單獨的程序。Mutex是一種原始的同步方式,其只對一個線程授予對共享資源的獨佔訪問。
class Program { static void Main(string[] args) { const string MutexName = "CSharpThreadingCookbook"; using (var m = new Mutex(false, MutexName)) { if (!m.WaitOne(TimeSpan.FromSeconds(5), false)) { Console.WriteLine("Second instance is running!"); } else { Console.WriteLine("Running!"); Console.ReadLine(); m.ReleaseMutex(); } } } }
當主程序啓動時,定義了一個指定名稱的互斥量,設置initialOwner標誌爲false。這意.味着若是互斥量已經被建立,則容許程序獲取該互斥量。若是沒有獲取到互斥量,程序則簡單地顯示Running,等待直到按下了任何鍵,而後釋放該互斥量並退出。
若是再運行一樣一個程序,則會在5秒鐘內嘗試獲取互斥量。若是此時在第一個程序中,按下了任何鍵,第二個程序則會開始執行。然而,若是保持等待5秒鐘,第二個程序將沒法,獲取到該瓦斥量。
本節將展現SemaphoreSlim類是如何做爲Semaphore類的輕量級版本的。該類限制了同時訪問同一個資源的線程數量。
class Program { static void Main(string[] args) { for (int i = 1; i <= 6; i++) { string threadName = "Thread " + i; int secondsToWait = 2 + 2 * i; var t = new Thread(() => AccessDatabase(threadName, secondsToWait)); t.Start(); } } static SemaphoreSlim _semaphore = new SemaphoreSlim(4); static void AccessDatabase(string name, int seconds) { Console.WriteLine("{0} waits to access a database", name); _semaphore.Wait(); Console.WriteLine("{0} was granted an access to a database", name); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} is completed", name); _semaphore.Release(); } }
當主程序啓動時,建立了SemaphoreSlim的一個實例,並在其構造函數中指定容許的併發線程數量。而後啓動了6個不一樣名稱和不一樣初始運行時間的線程。
每一個線程都嘗試獲取數據庫的訪問,可是咱們藉助於信號系統限制了訪問數據庫的併發,數爲4個線程。當有4個線程獲取了數據庫的訪問後,其餘兩個線程須要等待,直到以前線,程中的某一個完成工做並調用semaphore.Release方法來發出信號。
這裏咱們使用了混合模式,其容許咱們在等待時間很短的狀況下無需使用上下文切換。然而,有一個叫做Semaphore的SemaphoreSlim類的老版本。該版本使用純粹的內核時間 ( kernel-time)方式。通常不必使用它,除非是很是重要的場景。咱們能夠建立一個具名的semaphore,就像一個具名的mutex同樣,從而在不一樣的程序中同步線程。SemaphoreSlim並不使用Windows內核信號量,並且也不支持進程間同步。因此在跨程序同步的場景下能夠使用Semaphore.
本示例藉助於AutoResetEvent類來從一個線程向另外一個線程發送通知。AutoResetEvent類能夠通知等待的線程有某事件發生。
class Program { static void Main(string[] args) { var t = new Thread(() => Process(10)); t.Start(); Console.WriteLine("Waiting for another thread to complete work"); _workerEvent.WaitOne(); Console.WriteLine("First operation is completed!"); Console.WriteLine("Performing an operation on a main thread"); Thread.Sleep(TimeSpan.FromSeconds(5)); _mainEvent.Set(); Console.WriteLine("Now running the second operation on a second thread"); _workerEvent.WaitOne(); Console.WriteLine("Second operation is completed!"); Console.ReadKey(); } private static AutoResetEvent _workerEvent = new AutoResetEvent(false); private static AutoResetEvent _mainEvent = new AutoResetEvent(false); static void Process(int seconds) { Console.WriteLine("Starting a long running work..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); Console.WriteLine("Waiting for a main thread to complete its work"); _mainEvent.WaitOne(); Console.WriteLine("Starting second operation..."); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("Work is done!"); _workerEvent.Set(); } }
當主程序啓動時,定義了兩個AutoResetEvent實例。其中一個是從子線程向主線程發信號,另外一個實例是從主線程向子線程發信號。咱們向AutoResetEvent構造方法傳人false,定義了這兩個實例的初始狀態爲unsignaled。這意味着任何線程調用這兩個對象中的任何一個的WaitOne方法將會被阻塞,直到咱們調用了Set方法。若是初始事件狀態爲true,那麼 AutoResetEvent實例的狀態爲signaled,若是線程調用WaitOne方法則會被當即處理。而後事件狀態自動變爲unsignaled,因此須要再對該實例調用一次Set方法,以便讓其餘的線程對,該實例調用WaitOne方法從而繼續執行。
而後咱們建立了第二個線程,其會執行第一個操做10秒鐘,而後等待從第二個線程發,出的信號。該信號意味着第一個操做已經完成。如今第二個線程在等待主線程的信號。咱們對主線程作了一些附加工做,並經過調用mainEvent.Set方法發送了一個信號。而後等待從第二個線程發出的另外一個信號。
AutoResetEvent類採用的是內核時間模式,因此等待時間不能太長。使用ManualResetEventslim類更好,由於它使用的是混合模式。
本節將描述如何使用ManualResetEventSlim類來在線程間以更靈活的方式傳遞信號。
class Program { static void Main(string[] args) { var t1 = new Thread(() => TravelThroughGates("Thread 1", 5)); var t2 = new Thread(() => TravelThroughGates("Thread 2", 6)); var t3 = new Thread(() => TravelThroughGates("Thread 3", 12)); t1.Start(); t2.Start(); t3.Start(); Thread.Sleep(TimeSpan.FromSeconds(6)); Console.WriteLine("The gates are now open!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); _mainEvent.Reset(); Console.WriteLine("The gates have been closed!"); Thread.Sleep(TimeSpan.FromSeconds(10)); Console.WriteLine("The gates are now open for the second time!"); _mainEvent.Set(); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("The gates have been closed!"); _mainEvent.Reset(); Console.ReadKey(); } static void TravelThroughGates(string threadName, int seconds) { Console.WriteLine("{0} falls to sleep", threadName); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} waits for the gates to open!", threadName); _mainEvent.Wait(); Console.WriteLine("{0} enters the gates!", threadName); } static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false); }
當主程序啓動時,首先建立了ManualResetEventSlim類的一個實例。而後啓動了三個線程,等待事件信號通知它們繼續執行。
ManualResetEvnetSlim的整個工做方式有點像人羣經過大門。而AutoResetEvent事件像一個旋轉門,一次只容許一人經過。ManualResetEventSlim是ManualResetEvent的混合版本,一直保持大門敞開直到手動調用Reset方法。當調用mainEvent.Set時,至關於打開了大門從而容許準備好的線程接收信號並繼續工做。然而線程3還處於睡眠 "狀態,沒有遇上時間。當調用mainEvent.Reset至關於關閉了大門。最後一個線程已經準備好執行,可是不得不等待下一個信號,即要等待好幾秒鐘。
本節將描述如何使用CountdownEvent信號類來等待直到必定數量的操做完成。
class Program { static void Main(string[] args) { Console.WriteLine("Starting two operations"); var t1 = new Thread(() => PerformOperation("Operation 1 is completed", 4)); var t2 = new Thread(() => PerformOperation("Operation 2 is completed", 8)); t1.Start(); t2.Start(); _countdown.Wait(); Console.WriteLine("Both operations have been completed."); _countdown.Dispose(); Console.ReadKey(); } static CountdownEvent _countdown = new CountdownEvent(2); static void PerformOperation(string message, int seconds) { Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine(message); _countdown.Signal(); } }
當主程序啓動時,建立了一個CountdownEvent實例,在其構造函數中指定了當兩個操,做完成時會發出信號。而後咱們啓動了兩個線程,當它們執行完成後會發出信號。一旦第二個線程完成,主線程會從等待CountdownEvent的狀態中返回並繼續執行。針對須要等待多,個異步操做完成的情形,使用該方式是很是便利的。
然而這有一個重大的缺點。若是調用countdown.Signal()沒達到指定的次數,那麼-countdown. Wait()將一直等待。請確保使用CountdownEvent時,全部線程完成後都要調用,Signal方法。
本節將展現另外一種有意思的同步方式,被稱爲Barrier, Barrier類用於組織多個線程及時, 在某個時刻碰面。其提供了一個回調函數,每次線程調用了SignalAndWait方法後該回調函數會被執行。
class Program { static void Main(string[] args) { var t1 = new Thread(() => PlayMusic("the guitarist", "play an amazing solo", 5)); var t2 = new Thread(() => PlayMusic("the singer", "sing his song", 2)); t1.Start(); t2.Start(); Console.ReadKey(); } static Barrier _barrier = new Barrier(2,b => Console.WriteLine("End of phase {0}", b.CurrentPhaseNumber + 1)); static void PlayMusic(string name, string message, int seconds) { for (int i = 1; i < 3; i++) { Console.WriteLine("----------------------------------------------"); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} starts to {1}", name, message); Thread.Sleep(TimeSpan.FromSeconds(seconds)); Console.WriteLine("{0} finishes to {1}", name, message); _barrier.SignalAndWait(); } } }
咱們建立了Barrier類,指定了咱們想要同步兩個線程。在兩個線程中的任何一個調用了-barrier.SignalAndWait方法後,會執行一個回調函數來打印出階段。
每一個線程將向Barrier發送兩次信號,因此會有兩個階段。每次這兩個線程調用Signal AndWait方法時, Barrier將執行回調函數。這在多線程迭代運算中很是有用,能夠在每一個迭代,結束前執行一些計算。當最後一個線程調用SignalAndWait方法時能夠在迭代結束時進行交互。
本節將描述如何使用ReaderWriterLockSlim來建立一個線程安全的機制,在多線程中對,一個集合進行讀寫操做。ReaderWriterLockSlim表明了一個管理資源訪問的鎖,容許多個線程同時讀取,以及獨佔寫。
class Program { static void Main(string[] args) { new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(Read){ IsBackground = true }.Start(); new Thread(() => Write("Thread 1")){ IsBackground = true }.Start(); new Thread(() => Write("Thread 2")){ IsBackground = true }.Start(); Thread.Sleep(TimeSpan.FromSeconds(30)); Console.ReadKey(); } static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim(); static Dictionary<int, int> _items = new Dictionary<int, int>(); static void Read() { Console.WriteLine("Reading contents of a dictionary"); while (true) { try { _rw.EnterReadLock(); foreach (var key in _items.Keys) { Thread.Sleep(TimeSpan.FromSeconds(0.1)); } } finally { _rw.ExitReadLock(); } } } static void Write(string threadName) { while (true) { try { int newKey = new Random().Next(250); _rw.EnterUpgradeableReadLock(); if (!_items.ContainsKey(newKey)) { try { _rw.EnterWriteLock(); _items[newKey] = 1; Console.WriteLine("New key {0} is added to a dictionary by a {1}", newKey, threadName); } finally { _rw.ExitWriteLock(); } } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } finally { _rw.ExitUpgradeableReadLock(); } } } }
當主程序啓動時,同時運行了三個線程來從字典中讀取數據,還有另外兩個線程向該字典中寫入數據。咱們使用ReaderWriterLockSlim類來實現線程安全,該類專爲這樣的場景而設計。
這裏使用兩種鎖:讀鎖容許多線程讀取數據,寫鎖在被釋放前會阻塞了其餘線程的所,有操做。獲取讀鎖時還有一個有意思的場景,即從集合中讀取數據時,根據當前數據而決,定是否獲取一個寫鎖並修改該集合。一旦獲得寫鎖,會阻止閱讀者讀取數據,從而浪費大量的時間,所以獲取寫鎖後集合會處於阻塞狀態。爲了最小化阻塞浪費的時間,能夠使用 EnterUpgradeableReadLock和ExitUpgradeableReadLock方法。先獲取讀鎖後讀取數據。若是發現必須修改底層集合,只需使用EnterWriteLock方法升級鎖,而後快速執行一次寫操做.最後使用ExitWriteLock釋放寫鎖。
在本例中,咱們先生成一個隨機數。而後獲取讀鎖並檢查該數是否存在於字典的鍵集合中。若是不存在,將讀鎖更新爲寫鎖而後將該新鍵加入到字典中。始終使用tyr/finaly代碼塊來確保在捕獲鎖後必定會釋放鎖,這是一項好的實踐。全部的線程都被建立爲後臺線程。
主線程在全部後臺線程完成後會等待30秒。
本節將描述如何不使用內核模型的方式來使線程等待。另外,咱們介紹了SpinWait,它, ,是一個混合同步構造,被設計爲使用用戶模式等待一段時間,而後切換到內核模式以節省CPU時間。
class Program { static void Main(string[] args) { var t1 = new Thread(UserModeWait); var t2 = new Thread(HybridSpinWait); Console.WriteLine("Running user mode waiting"); t1.Start(); Thread.Sleep(20); _isCompleted = true; Thread.Sleep(TimeSpan.FromSeconds(1)); _isCompleted = false; Console.WriteLine("Running hybrid SpinWait construct waiting"); t2.Start(); Thread.Sleep(5); _isCompleted = true; Console.ReadKey(); } static volatile bool _isCompleted = false; static void UserModeWait() { while (!_isCompleted) { Console.Write("."); } Console.WriteLine(); Console.WriteLine("Waiting is complete"); } static void HybridSpinWait() { var w = new SpinWait(); while (!_isCompleted) { w.SpinOnce(); Console.WriteLine(w.NextSpinWillYield); } Console.WriteLine("Waiting is complete"); } }
當主程序啓動時,定義了一個線程,將執行一個無止境的循環,直到20毫秒後主線程,設置_isCompleted變量爲true,咱們能夠試驗運行該週期爲20-30秒,經過Windows任務管理器測量CPU的負載狀況。取決於CPU內核數量,任務管理器將顯示一個顯著的處理時間。
咱們使用volatile關鍵字來聲明isCompleted靜態字段。Volatile關鍵字指出一個字段可能會被同時執行的多個線程修改。聲明爲volatile的字段不會被編譯器和處理器優化爲只能被單個線程訪問。這確保了該字段老是最新的值。
而後咱們使用了SpinWait版本,用於在每一個迭代打印一個特殊標誌位來顯示線程是否切換爲阻塞狀態。運行該線程5毫秒來查看結果。剛開始, SpinWait嘗試使用用戶模式,在9 個迭代後,開始切換線程爲阻塞狀態。若是嘗試測量該版本的CPU負載,在Windows任務管理器將不會看到任何CPU的使用。
在以前的章節中咱們討論了建立線程和線程協做的幾種方式。如今考慮另外一種狀況,即只花費極少的時間來完成建立不少異步操做。建立線程是昂貴的操做,因此爲每一個短暫的異步操做建立線程會產生顯著的開銷。
爲了解決該問題,有一個經常使用的方式叫作池( pooling),線程池能夠成功地適應於任何須要大量短暫的開銷大的資源的情形。咱們事先分配必定的資源,將這些資源放入到資源池。每次須要新的資源,只需從池中獲取一個,而不用建立一個新的。當該資源再也不被使用,時,就將其返回到池中。
.NET線程池是該概念的一種實現。經過System.Threading.ThreadPool類型能夠使用線程池。線程池是受,NET通用語言運行時( Common Language Runtime,簡稱CLR)管理的。這意味着每一個CLR都有一個線程池實例。ThreadPool類型擁有一個QueueUserWorkItem靜態方法。該靜態方法接受一個委託,表明用戶自定義的一個異步操做。在該方法被調用後,委,託會進入到內部隊列中。若是池中沒有任何線程,將建立一個新的工做線程( worker thread) 並將隊列中第一個委託放入到該工做線程中。若是想線程池中放入新的操做,當以前的全部操做完成後,極可能只需重用一個線程來執行這些新的操做。然而,若是放置新的操做過快,線程池將建立更多的線程來執行這些操,做。建立太多的線程是有限制的,在這種狀況下新的操做將在隊列中等待直到線程池中的工做線程有能力來執行它們。
當中止向線程池中放置新操做時,線程池最終會刪除必定時間後過時的再也不使用的線程。這將釋放全部那些再也不須要的系統資源。我想再次強調線程池的用途是執行運行時間短的操做。使用線程池能夠減小並行度耗費,及節省操做系統資源。
咱們只使用較少的線程,可是以比日常更慢的速度來執行異步操做, ,使用必定數量的可用的工做線程批量處理這些操做。若是操做能快速地完成則比較適用線程!池,可是執行長時間運行的計算密集型操做則會下降性能。
另外一個重要事情是在ASPNET應用程序中使用線程池時要至關當心。ASPNET基礎設施使用本身的線程池,若是在線程池中浪費全部的工做線程, Web服務器將不可以服務新的請求。在ASPNET中只推薦使用輸入/輸出密集型的異步操做,由於其使用了一個不一樣的方式,叫作IO線程。
在本章中,咱們將學習使用線程池來執行異步操做。本章將覆蓋將操做放入線程池的不,,同方式,以及如何取消一個操做,並防止其長時間運行。
保持線程中的操做都是短暫的是很是重要的。不要在線程池中放入長時間運行的操做,或者阻塞工做線程。這將致使全部工做線程變得繁忙,從而沒法服務用戶操做。這會致使性能問題和很是難以調試的錯誤。
請注意線程池中的工做線程都是後臺線程。這意味着當全部的前臺線程(包括主程序線程)完成後,全部的後臺線程將中止工做。
本節將展現在線程池中如何異步的執行委託。另外,咱們將討論一個叫作異步編程模型(Asynchronous Programming Model,簡稱APM)的方式,這是NET歷史中第一個異步編程模式。
class Program { static void Main(string[] args) { int threadId = 0; RunOnThreadPool poolDelegate = Test; var t = new Thread(() => Test(out threadId)); t.Start(); t.Join(); Console.WriteLine("Thread id: {0}", threadId); IAsyncResult r = poolDelegate.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); r.AsyncWaitHandle.WaitOne(); string result = poolDelegate.EndInvoke(out threadId, r); Console.WriteLine("Thread pool worker thread id: {0}", threadId); Console.WriteLine(result); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private delegate string RunOnThreadPool(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
當程序運行時,使用舊的方式建立了一個線程,而後啓動它並等待完成。因爲線程的構造函數只接受一個無任何返回結果的方法,咱們使用了lambda表達式來將對Test方法的調用包起來。咱們經過打印出Thread. CurrentThread.IsThreadPoolThread屬性值來確,保該線程不是來自線程池。咱們也打印出了受管理的線程ID來識別代碼是被哪一個線程執行的。
而後定義了一個委託並調用Beginlnvoke方法來運行該委託。BeginInvoke方法接受一個回調函數。該回調函數會在異步操做完成後會被調用,而且一個用戶自定義的狀態會傳給該回調函數。該狀態一般用於區分異步調用。結果,咱們獲得了一個實現了IAsyncResult接口的result對象。BeginInvoke當即返回告終果,當線程池中的工做線程在執行異步操做時,仍容許咱們繼續其餘工做。當須要異步操做的結果時,能夠使用BeginInvoke方法調用返回的result對象。咱們能夠使用result對象的IsCompleted屬性輪詢結果。可是在本例子中,使用的是AsyncWaitHandle屬性來等待直到操做完成。當操做完成後,會獲得一個結果,能夠經過委託調用EndInvoke方法,將IAsyncResult對象傳遞給委託參數。
事實上使用AsyncWaitHandle並非必要的。若是註釋掉r.AsyncWaitHandle.WaitOne,代碼照樣能夠成功運行, 由於EndInvoke方法事實上會等待異步操做完成。調用 "EndInvoke方法(或者針對其餘異步API的EndOperationName方法)是很是重要的, '由於該方法會將任何未處理的異常拋回到調用線程中。當使用這種異步API時,請確保始終調用了Begin和End方法。
當操做完成後,傳遞給BeginInvoke方法的回調函數將被放置到線程池中,確切地說是,一個工做線程中。若是在Main方法定義的結尾註釋掉Thread.Sleep方法調用,回調函數將不,會被執行。這是由於當主線程完成後,全部的後臺線程會被中止,包括該回調函數。對委託和回調函數的異步調用極可能會被同一個工做線程執行。經過工做線程ID能夠容易地看出。使用BeginOperationName/EndOperationName方法和.NET中的IAsyncResult對象等方 ,式被稱爲異步編程模型(或APM模式),這樣的方法對被稱爲異步方法。該模式也被應用於多個,NET類庫的API中,但在現代編程中,更推薦使用任務並行庫( Task Parallel Library,簡稱TPL)來組織異步API
class Program { static void Main(string[] args) { const int x = 1; const int y = 2; const string lambdaState = "lambda state 2"; ThreadPool.QueueUserWorkItem(AsyncOperation); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem(AsyncOperation, "async state"); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem( state => { Console.WriteLine("Operation state: {0}", state); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); ThreadPool.QueueUserWorkItem( _ => { Console.WriteLine("Operation state: {0}, {1}", x+y, lambdaState); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.ReadKey(); } private static void AsyncOperation(object state) { Console.WriteLine("Operation state: {0}", state ?? "(null)"); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); } }
首先定義了AsyncOperation方法,其接受單個object類型的參數。而後使用QueueUser WorkItem方法將該方法放到線程池中。接着再次放入該方法,可是此次給方法調用傳入了一個狀態對象。該對象將做爲狀態參數傳遞給AsynchronousOperation方法。
在操做完成後讓線程睡眠一秒鐘,從而讓線程池擁有爲新操做重用線程的可能性。若是註釋掉全部的Thread.Sleep調用,那麼全部打印出的線程ID多半是不同的。若是ID是同樣的,那極可能是前兩個線程被重用來運行接下來的兩個操做。
首先將一個lambda表達式放置到線程池中。這裏沒什麼特別的。咱們使用了labmbda表達式語法,從而無須定義一個單獨的方法。
而後,咱們使用閉包機制,從而無須傳遞lambda表達式的狀態。閉包更靈活,容許我,們向異步操做傳遞一個以上的對象並且這些對象具備靜態類型。因此以前介紹的傳遞對象給,方法回調的機制既冗餘又過期。在C#中有了閉包後就再也不須要使用它了。
本節將展現線程池如何工做於大量的異步操做,以及它與建立大量單獨的線程的方式有何不一樣。
class Program { static void Main(string[] args) { const int numberOfOperations = 500; var sw = new Stopwatch(); sw.Start(); UseThreads(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); sw.Reset(); sw.Start(); UseThreadPool(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); Console.ReadKey(); } static void UseThreads(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Scheduling work by creating threads"); for (int i = 0; i < numberOfOperations; i++) { var thread = new Thread(() => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); thread.Start(); } countdown.Wait(); Console.WriteLine(); } } static void UseThreadPool(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Starting work on a threadpool"); for (int i = 0; i < numberOfOperations; i++) { ThreadPool.QueueUserWorkItem( _ => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); } countdown.Wait(); Console.WriteLine(); } } }
當主程序啓動時,建立了不少不一樣的線程,每一個線程都運行一個操做。該操做打印出線,程ID並阻塞線程100毫秒。結果咱們建立了500個線程,所有並行運行這些操做。雖然在,個人機器上的總耗時是300毫秒,可是全部線程消耗了大量的操做系統資源。
而後咱們使用了執行一樣的任務,只不過不爲每一個操做建立一個線程,而將它們放入到線程池中。而後線程池開始執行這些操做。線程池在快結束時建立更多的線程,可是仍然花,費了更多的時間,在我機器上是12秒。咱們爲操做系統節省了內存和線程數,可是爲此付,出了更長的執行時間。
.本節將經過一個示例來展現如何在線程池中取消異步操做。
class Program { static void Main(string[] args) { using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation1(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation2(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } using (var cts = new CancellationTokenSource()) { CancellationToken token = cts.Token; ThreadPool.QueueUserWorkItem(_ => AsyncOperation3(token)); Thread.Sleep(TimeSpan.FromSeconds(2)); cts.Cancel(); } Thread.Sleep(TimeSpan.FromSeconds(2)); } static void AsyncOperation1(CancellationToken token) { Console.WriteLine("Starting the first task"); for (int i = 0; i < 5; i++) { if (token.IsCancellationRequested) { Console.WriteLine("The first task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The first task has completed succesfully"); } static void AsyncOperation2(CancellationToken token) { try { Console.WriteLine("Starting the second task"); for (int i = 0; i < 5; i++) { token.ThrowIfCancellationRequested(); Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The second task has completed succesfully"); } catch (OperationCanceledException) { Console.WriteLine("The second task has been canceled."); } } private static void AsyncOperation3(CancellationToken token) { bool cancellationFlag = false; token.Register(() => cancellationFlag = true); Console.WriteLine("Starting the third task"); for (int i = 0; i < 5; i++) { if (cancellationFlag) { Console.WriteLine("The third task has been canceled."); return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine("The third task has completed succesfully"); } }
本節中介紹了CancellationTokenSource和CancellationToken兩個新類。它們在.NET4.0被引人, 目前是實現異步操做的取消操做的事實標準。因爲線程池已經存在了很長時間,並,沒有特殊的API來實現取消標記功能,可是仍然能夠對線程池使用上述API。
在本程序中使用了三種方式來實現取消過程。第一個是輪詢來檢查CancellationToken.IsCancellationRequested屬性。若是該屬性爲true,則說明操做須要被取消,咱們必須放棄該操做。
第二種方式是拋出一個OperationCancelledException異常。這容許在操做以外控制取消過程,即須要取消操做時,經過操做以外的代碼來處理。
最後一種方式是註冊一個回調函數。當操做被取消時,在線程池將調用該回調函數。這容許鏈式傳遞一個取消邏輯到另外一個異步操做中。
本節將描述如何在線程池中對操做實現超時,以及如何在線程池中正確地等待。
class Program { static void Main(string[] args) { RunOperations(TimeSpan.FromSeconds(5)); RunOperations(TimeSpan.FromSeconds(7)); } static void RunOperations(TimeSpan workerOperationTimeout) { using (var evt = new ManualResetEvent(false)) using (var cts = new CancellationTokenSource()) { Console.WriteLine("Registering timeout operations..."); var worker = ThreadPool.RegisterWaitForSingleObject(evt, (state, isTimedOut) => WorkerOperationWait(cts, isTimedOut), null, workerOperationTimeout, true); Console.WriteLine("Starting long running operation..."); ThreadPool.QueueUserWorkItem(_ => WorkerOperation(cts.Token, evt)); Thread.Sleep(workerOperationTimeout.Add(TimeSpan.FromSeconds(2))); worker.Unregister(evt); } } static void WorkerOperation(CancellationToken token, ManualResetEvent evt) { for(int i = 0; i < 6; i++) { if (token.IsCancellationRequested) { return; } Thread.Sleep(TimeSpan.FromSeconds(1)); } evt.Set(); } static void WorkerOperationWait(CancellationTokenSource cts, bool isTimedOut) { if (isTimedOut) { cts.Cancel(); Console.WriteLine("Worker operation timed out and was canceled."); } else { Console.WriteLine("Worker operation succeded."); } } }
線程池還有一個有用的方法: ThreadPool.RegisterWaitForSingleObject,該方法容許咱們將回調函數放入線程池中的隊列中。當提供的等待事件處理器收到信號或發生超時時,該回調函數將被調用。這容許咱們爲線程池中的操做實現超時功能。
首先按順序向線程池中放入一個耗時長的操做。它運行6秒鐘而後一旦成功完成,會設置一個ManualResetEvent信號類。其餘的狀況下,好比須要取消操做,則該操做會被丟棄。 .
而後咱們註冊了第二個異步操做。當從ManualResetEvent對象接受到一個信號後,該異步操做會被調用。若是第一個操做順利完成,會設置該信號量。另外一種狀況是第一個操做還未完成就已經超時。若是發生了該狀況,咱們會使用CancellationToken來取消第一個操做。
最後,爲操做提供5秒的超時時間是不夠的。這是由於操做會花費6秒來完成,只能取消該操做。因此若是提供7秒的超時時間是可行的,該操做會順利完成。
當有大量的線程必須處於阻塞狀態中等待一些多線程事件發信號時,以上方式很是有,用。藉助於線程池的基礎設施,咱們無需阻塞全部這樣的線程。能夠釋放這些線程直到信號事件被設置。在服務器端應用程序中這是個很是重要的應用場景,由於服務器端應用程序要求高伸縮性及高性能。
本節將描述如何使用System.Threading. Timer對象來在線程池中建立週期性調用的異步
class Program { static void Main(string[] args) { Console.WriteLine("Press 'Enter' to stop the timer..."); DateTime start = DateTime.Now; _timer = new Timer(_ => TimerOperation(start), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2)); Thread.Sleep(TimeSpan.FromSeconds(6)); _timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(4)); Console.ReadLine(); _timer.Dispose(); Console.ReadKey(); } static Timer _timer; static void TimerOperation(DateTime start) { TimeSpan elapsed = DateTime.Now - start; Console.WriteLine("{0} seconds from {1}. Timer thread pool thread id: {2}", elapsed.Seconds, start,Thread.CurrentThread.ManagedThreadId); } }
咱們首先建立了一個Timer實例。第一個參數是一個1ambda表達式,將會在線程池中被執行。咱們調用TimerOperation方法並給其提供一個起始時間。因爲無須使用用戶狀態對象,因此第二個參數爲null,而後指定了何時會第一次運行TimerOperation,以及以後 "再次調用的間隔時間。因此第一個值實際上說明一秒後會啓動第一次操做,而後每隔兩秒再,次運行。
以後等待6秒後修改計時器。在調用timer.Change方法一秒後啓動TimerOperation,而後每隔4秒再次運行。
計時器還能夠更復雜:能夠以更復雜的方式使用計時器。好比,能夠經過Timeout.Infinet值提供給計時器個間隔參數來只容許計時器操做一次。而後在計時器異步操做內,可以設置下一次計,時器操做將被執行的時間。具體時間取決於自定義業務邏輯。
class Program { static void Main(string[] args) { var bw = new BackgroundWorker(); bw.WorkerReportsProgress = true; bw.WorkerSupportsCancellation = true; bw.DoWork += Worker_DoWork; bw.ProgressChanged += Worker_ProgressChanged; bw.RunWorkerCompleted += Worker_Completed; bw.RunWorkerAsync(); Console.WriteLine("Press C to cancel work"); do { if (Console.ReadKey(true).KeyChar == 'C') { bw.CancelAsync(); } } while(bw.IsBusy); } static void Worker_DoWork(object sender, DoWorkEventArgs e) { Console.WriteLine("DoWork thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); var bw = (BackgroundWorker) sender; for (int i = 1; i <= 100; i++) { if (bw.CancellationPending) { e.Cancel = true; return; } if (i%10 == 0) { bw.ReportProgress(i); } Thread.Sleep(TimeSpan.FromSeconds(0.1)); } e.Result = 42; } static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e) { Console.WriteLine("{0}% completed. Progress thread pool thread id: {1}", e.ProgressPercentage, Thread.CurrentThread.ManagedThreadId); } static void Worker_Completed(object sender, RunWorkerCompletedEventArgs e) { Console.WriteLine("Completed thread pool thread id: {0}", Thread.CurrentThread.ManagedThreadId); if (e.Error != null) { Console.WriteLine("Exception {0} has occured.", e.Error.Message); } else if (e.Cancelled) { Console.WriteLine("Operation has been canceled."); } else { Console.WriteLine("The answer is: {0}", e.Result); } } }
當程序啓動時,建立了一個BackgroundWorker組件的實例。顯式地指出該後臺工做線,程支持取消操做及該操做進度的通知。
接下來是最有意思的部分。咱們沒有使用線程池和委託,而是使用了另外一個C#語法,稱爲事件。事件表示了一些通知的源或當通知到達時會有所響應的一系列訂閱者。在本例中,咱們將訂閱三個事件,當這些事件發生時,將調用相應的事件處理器。當事件通知其訂,閱者時,具備特殊的定義簽名的方法將被調用。
所以,除了將異步API組織爲Begin/End方法對,還能夠只啓動一個異步操做而後訂閱給不一樣的事件。這些事件在該操做執行時會被觸發。這種方式被稱爲基於事件的異步模式, ( Event-based Asynchronous Pattern,簡稱EAP)。這是歷史上第二種用來構造異步程序的方,式,如今更推薦使用TPL
咱們共定義了三個事件。第一個是oWork事件。當一個後臺工做對象經過RunWorkerAsync方法啓動一個異步操做時,該事件處理器將被調用。該事件處理器將會運行在線程池中。若是須要取消操做,則這裏是主要的操做點來取消執行。同時也能夠提供該操做的運行進程信,息。最後,獲得結果後,將結果設置給事件參數,而後RunWorkerCompleted事件處理器將,被調用。在該方法中,能夠知道操做是成功完成,仍是發生錯誤,抑或被取消。
基於此, BackgroundWorker組件實際上被使用於Windows窗體應用程序(Windows Forms Applications,簡稱WPF)中。該實現經過後臺工做事件處理器的代碼能夠直接與UI控制器交互。與線程池中的線程與UI控制器交互的方式相比較,使用BackgroundWorker組件的方式更加天然和好用。
咱們在以前的章節中學習了什麼是線程,如何使用線程,以及爲何須要線程池。使用線程池能夠使咱們在減小並行度花銷時節省操做系統資源。咱們能夠認爲線程池是一個抽象層,其向程序員隱藏了使用線程的細節,使咱們專心處理程序邏輯,而不是各類線程,問題。
然而使用線程池也至關複雜。從線程池的工做線程中獲取結果並不容易。咱們須要實現,自定義方式來獲取結果,並且萬一有異常發生,還需將異常正確地傳播到初始線程中。除此,之外,建立一組相關的異步操做,以及實現當前操做執行完成後下一操做纔會執行的邏輯也不容易。在嘗試解決這些問題的過程當中,建立了異步編程模型及基於事件的異步模式。在第3章中提到過基於事件的異步模式。這些模式使得獲取結果更容易,傳播異常也更輕鬆,可是組,合多個異步操做仍需大量工做,須要編寫大量的代碼。
爲了解決全部的問題, Net Framework4.0引入了一個新的關於異步操做的API,它叫作.任務並行庫( Task Parallel Library,簡稱TPL), .Net Framework 4.5版對該API進行了輕微的改進,使用更簡單。在本書的項目中將使用最新版的TPL,即.Net Framework 4.5版中的 API, TPL可被認爲是線程池之上的又一個抽象層,其對程序員隱藏了與線程池交互的底層代碼,並提供了更方便的細粒度的APL, TPL的核心概念是任務。一個任務表明了一個異步操做,該操做能夠經過多種方式運行,能夠使用或不使用獨立線程運行。在本章中將探究任務的全部使用細節。
默認狀況下,程序員無須知道任務其實是如何執行的。TPL經過向用戶隱藏任務的實現細節從而建立一個抽象層。遺憾的是,有些狀況下這會致使詭祕的錯誤,好比試圖獲取任務的結果時程序被掛起。本章有助於理解TPL底層的原理,以及如何避免不恰當的使用方式。
一個任務能夠經過多種方式和其餘任務組合起來。例如,能夠同時啓動多個任務,等待全部任務完成,而後運行一個任務對以前全部任務的結果進行一些計算。TPL與以前的模式相比,其中一個關鍵優點是其具備用於組合任務的便利的API,
處理任務中的異常結果有多種方式。因爲一個任務可能會由多個其餘任務組成,這些任,務也可能依次擁有各自的子任務,因此有一個AggregateException的概念。這種異常能夠捕獲底層任務內部的全部異常,並容許單獨處理這些異常。
並且,最後但並非最不重要的, C# 5.0已經內置了對TPL的支持,容許咱們使用新的 await和async關鍵字以平滑的、舒服的方式操做任務。
在本章中咱們將學習使用TPL來執行異步操做。咱們將學習什麼是任務,如何用不一樣的,方式建立任務,以及如何將任務組合在一塊兒。咱們會討論如何將遺留的APM和EAP模式轉換爲使用任務,還有如何正確地處理異常,如何取消任務,以及如何使多個任務同時執行。另外,還將講述如何在Windows GUI應用程序中正確地使用任務。
class Program { static void Main(string[] args) { var t1 = new Task(() => TaskMethod("Task 1")); var t2 = new Task(() => TaskMethod("Task 2")); t2.Start(); t1.Start(); Task.Run(() => TaskMethod("Task 3")); Task.Factory.StartNew(() => TaskMethod("Task 4")); Task.Factory.StartNew(() => TaskMethod("Task 5"), TaskCreationOptions.LongRunning); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static void TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
當程序運行時,咱們使用Task的構造函數建立了兩個任務。咱們傳入一個lambda表達式做爲Action委託。這能夠使咱們給TaskMethod提供一個string參數。而後使用Start方法運行這些任務。
請注意只有調用了這些任務的Start方法,纔會執行任務。很容易忘記真正啓動任務。
而後使用Task.Run和Task.Factory.StartNew方法來運行了另外兩個任務。與使用Task構造函數的不一樣之處在於這兩個被建立的任務會當即開始工做,因此無需顯式地調用這些任務的Start方法。從Task 1到Task 4的全部任務都被放置在線程池的工做線程中並以未指定,的順序運行。若是屢次運行該程序,就會發現任務的執行順序是不肯定的。
Task.Run方法只是Task.Factory.StartNew的一個快捷方式,可是後者有附加的選項。通!常若是無特殊需求,則可以使用前一個方法,如Task 5所示。咱們標記該任務爲長時間運行,結果該任務將不會使用線程池,而在單獨的線程中運行。然而,根據運行該任務的當前的任務調度程序( task scheduler)運行方式有可能不一樣。
本節將描述如何從任務中獲取結果值。咱們將經過幾個場景來了解在線程池中和主線程中運行任務的不一樣之處。
class Program { static void Main(string[] args) { TaskMethod("Main Thread Task"); Task<int> task = CreateTask("Task 1"); task.Start(); int result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 2"); task.RunSynchronously(); result = task.Result; Console.WriteLine("Result is: {0}", result); task = CreateTask("Task 3"); Console.WriteLine(task.Status); task.Start(); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); result = task.Result; Console.WriteLine("Result is: {0}", result); Console.ReadKey(); } static Task<int> CreateTask(string name) { return new Task<int>(() => TaskMethod(name)); } static int TaskMethod(string name) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); return 42; } }
首先直接運行TaskMethod方法,這裏並無把它封裝到一個任務中。結果根據它提供給咱們的主線程的信息能夠得知該方法是被同步執行的。很顯然它不是線程池中的線程。
而後咱們運行了Task 1,使用Start方法啓動該任務並等待結果。該任務會被放置在線程池中,而且主線程會等待,直到任務返回前一直處於阻塞狀態。
Task 2和Task 1相似,除了Task 2是經過RunSynchronously()方法運行的。該任務會運行在主線程中,該任務的輸出與第一個例子中直接同步調用TaskMethod的輸出徹底同樣。這是個很是好的優化,能夠避免使用線程池來執行很是短暫的操做。
咱們用以運行Task 1相同的方式來運行Task 3,但此次沒有阻塞主線程,只是在該任務完成前循環打印出任務狀態。結果展現了多種任務狀態,分別是Creatd, Running和 RanToCompletion.
本節將展現如何設置相互依賴的任務。咱們將學習如何建立一個任務,使其在父任務完成後纔會被運行。另外,將探尋爲很是短暫的任務節省線程開銷的可能性。
class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); firstTask.ContinueWith( t => Console.WriteLine("The first answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); Task continuation = secondTask.ContinueWith( t => Console.WriteLine("The second answer is {0}. Thread id {1}, is thread pool thread: {2}", t.Result, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); continuation.GetAwaiter().OnCompleted( () => Console.WriteLine("Continuation Task Completed! Thread id {0}, is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread)); Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine(); firstTask = new Task<int>(() => { var innerTask = Task.Factory.StartNew(() => TaskMethod("Second Task", 5), TaskCreationOptions.AttachedToParent); innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent); return TaskMethod("First Task", 2); }); firstTask.Start(); while (!firstTask.IsCompleted) { Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(firstTask.Status); Thread.Sleep(TimeSpan.FromSeconds(10)); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
當主程序啓動時,咱們建立了兩個任務,併爲第一個任務設置了一個後續操做( continuation,一個代碼塊,會在當前任務完成後運行),而後啓動這兩個任務並等待4秒,這個時間足夠兩個任務完成。而後給第二個任務運行另外一個後續操做,並經過指定TaskContinuationOptions."ExecuteSynchronously選項來嘗試同步執行該後續操做。若是後續操做耗時很是短暫,使用以上方式是很是有用的,由於放置在主線程中運行比放置在線程池中運行要快。能夠實現這一點是由於第二個任務剛好在那刻完成。若是註釋掉4秒的Thread.Sleep方法,將會看到該代碼被放置到線程池中,這是由於還未從以前的任務中獲得結果。
最後咱們爲以前的後續操做也定義了一個後續操做,但這裏使用了一個稍微不一樣的方式,即便用了新的GetAwaiter和OnCompleted方法。這些方法是C# 5.0語言中異步機制中的方法。
本節示例的最後部分與父子線程有關。咱們建立了一個新任務,當運行該任務時,經過提供一個TaskCreationOptions.AttachedToParent選項來運行一個所謂的子任務。
子任務必須在父任務運行時建立,並正確的附加給父任務!
這意味着只有全部子任務結束工做,父任務纔會完成。經過提供一個TaskContinuation Options選項也能夠給在子任務上運行後續操做。該後續操做也會影響父任務,而且直到最後一個子任務結束它纔會運行完成。
本節將說明如何將過期的APM API轉換爲任務。多個示例覆蓋了轉換過程當中可能發生的不一樣狀況。
class Program { private static void Main(string[] args) { int threadId; AsynchronousTask d = Test; IncompatibleAsynchronousTask e = Test; Console.WriteLine("Option 1"); Task<string> task = Task<string>.Factory.FromAsync( d.BeginInvoke("AsyncTaskThread", Callback, "a delegate asynchronous call"), d.EndInvoke); task.ContinueWith(t => Console.WriteLine("Callback is finished, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 2"); task = Task<string>.Factory.FromAsync( d.BeginInvoke, d.EndInvoke, "AsyncTaskThread", "a delegate asynchronous call"); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}", t.Result)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.WriteLine("----------------------------------------------"); Console.WriteLine(); Console.WriteLine("Option 3"); IAsyncResult ar = e.BeginInvoke(out threadId, Callback, "a delegate asynchronous call"); task = Task<string>.Factory.FromAsync(ar, _ => e.EndInvoke(out threadId, ar)); task.ContinueWith(t => Console.WriteLine("Task is completed, now running a continuation! Result: {0}, ThreadId: {1}", t.Result, threadId)); while (!task.IsCompleted) { Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(0.5)); } Console.WriteLine(task.Status); Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } private delegate string AsynchronousTask(string threadName); private delegate string IncompatibleAsynchronousTask(out int threadId); private static void Callback(IAsyncResult ar) { Console.WriteLine("Starting a callback..."); Console.WriteLine("State passed to a callbak: {0}", ar.AsyncState); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Console.WriteLine("Thread pool worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); } private static string Test(string threadName) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); Thread.CurrentThread.Name = threadName; return string.Format("Thread name: {0}", Thread.CurrentThread.Name); } private static string Test(out int threadId) { Console.WriteLine("Starting..."); Console.WriteLine("Is thread pool thread: {0}", Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(2)); threadId = Thread.CurrentThread.ManagedThreadId; return string.Format("Thread pool worker thread id was: {0}", threadId); } }
這裏咱們定義了兩種委託。其中一個使用了out參數,所以在將APM模式轉換爲任務,時,與標準的TPLAPI是不兼容的。這樣的轉換有三個示例。
將APM轉換爲TPL的關鍵點是Task<T>.Factory.FromAsync方法, T是異步操做結果的類型。該方法有數個重載。在第一個例子中傳人了IAsyncResult和Func<lAsyncResult, string?,這是一個將IAsyncResult的實現做爲參數並返回一個字符串的方法。因爲第一個委託類型提供的EndMethod與該簽名是兼容的,因此將該委託的異步調用轉換爲任務沒有任何問題。
第二個例子作的事與第一個很是類似,可是使用了不一樣的FromAsync方法重載,該重載 ,並不容許指定一個將會在異步委託調用完成後被調用的回調函數。但咱們能夠使用後續操做,替代它。但若是回調函數很重要,能夠使用第一個例子所示的方法。
最後一個例子展現了一個小技巧。此次IncompatibleAsynchronousTask委託的 EndMethod使用了out參數,與FromAsync方法重載並不兼容。然而,能夠很容易地將 EndMethod調用封裝到一個lambda表達式中,從而適合任務工廠方法。
能夠在等待異步操做結果過程當中打印出任務狀態,從而瞭解底層任務的運行狀況。能夠看到第一個任務的狀態爲WaitingForActivation,這意味着TPL基礎設施實際上還未啓動該任務。
本節將描述如何將基於事件的異步操做轉換爲任務。在本節中,你將發現有一個可靠的模式可適用於.Net Framework類庫中的全部基於事件的異步API.
class Program { static void Main(string[] args) { var tcs = new TaskCompletionSource<int>(); var worker = new BackgroundWorker(); worker.DoWork += (sender, eventArgs) => { eventArgs.Result = TaskMethod("Background worker", 5); }; worker.RunWorkerCompleted += (sender, eventArgs) => { if (eventArgs.Error != null) { tcs.SetException(eventArgs.Error); } else if (eventArgs.Cancelled) { tcs.SetCanceled(); } else { tcs.SetResult((int)eventArgs.Result); } }; worker.RunWorkerAsync(); int result = tcs.Task.Result; Console.WriteLine("Result is: {0}", result); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
這是一個將EAP模式轉換爲任務的既簡單又優美的示例。關鍵點在於使用TaskCompletionSource<T>類型, T是異步操做結果類型。
不要忘記將tcs.SetResult調用封裝在try-catch代碼塊中,從而保證錯誤信息始終會設置給任務完成源對象。也能夠使用TrySetResult方法來替代SetResult方法,以保證結果能被成功設置。
本節是關於如何給基於任務的異步操做實現取消流程。咱們將學習如何正確的使用取消標誌,以及在任務真正運行前如何得知其是否被取消。
class Program { private static void Main(string[] args) { var cts = new CancellationTokenSource(); var longTask = new Task<int>(() => TaskMethod("Task 1", 10, cts.Token), cts.Token); Console.WriteLine(longTask.Status); cts.Cancel(); Console.WriteLine(longTask.Status); Console.WriteLine("First task has been cancelled before execution"); cts = new CancellationTokenSource(); longTask = new Task<int>(() => TaskMethod("Task 2", 10, cts.Token), cts.Token); longTask.Start(); for (int i = 0; i < 5; i++ ) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } cts.Cancel(); for (int i = 0; i < 5; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); Console.WriteLine(longTask.Status); } Console.WriteLine("A task has been completed with result {0}.", longTask.Result); Console.ReadKey(); } private static int TaskMethod(string name, int seconds, CancellationToken token) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); for (int i = 0; i < seconds; i ++) { Thread.Sleep(TimeSpan.FromSeconds(1)); if (token.IsCancellationRequested) return -1; } return 42*seconds; } }
第3章中咱們已經討論了取消標誌概念,你已經至關熟悉了。而本節又是一個關於爲TPL任務實現取消選項的簡單例子。
首先仔細看看longTask的建立代碼。咱們將給底層任務傳遞一次取消標誌,而後給任務構造函數再傳遞一次。爲何須要提供取消標誌兩次呢?
答案是若是在任務實際啓動前取消它,該任務的TPL基礎設施有責任處理該取消操做,由於這些代碼根本不會執行。經過獲得的第一個任務的狀態能夠知道它被取消了。若是嘗試對該任務調用Start方法,將會獲得InvalidOperationException異常。
而後須要本身寫代碼來處理取消過程。這意味着咱們對取消過程全權負責,而且在取消,任務後,任務的狀態仍然是RanToCompletion,由於從TPL的視角來看,該任務正常完成了它的工做。辨別這兩種狀況是很是重要的,而且須要理解每種狀況下職責的不一樣。
本節將描述異步任務中處理異常這一重要的主題。咱們將討論任務中拋出異常的不一樣狀況及如何獲取這些異常信息
class Program { static void Main(string[] args) { Task<int> task; try { task = Task.Run(() => TaskMethod("Task 1", 2)); int result = task.Result; Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); try { task = Task.Run(() => TaskMethod("Task 2", 2)); int result = task.GetAwaiter().GetResult(); Console.WriteLine("Result: {0}", result); } catch (Exception ex) { Console.WriteLine("Exception caught: {0}", ex); } Console.WriteLine("----------------------------------------------"); Console.WriteLine(); var t1 = new Task<int>(() => TaskMethod("Task 3", 3)); var t2 = new Task<int>(() => TaskMethod("Task 4", 2)); var complexTask = Task.WhenAll(t1, t2); var exceptionHandler = complexTask.ContinueWith(t => Console.WriteLine("Exception caught: {0}", t.Exception), TaskContinuationOptions.OnlyOnFaulted ); t1.Start(); t2.Start(); Thread.Sleep(TimeSpan.FromSeconds(5)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); throw new Exception("Boom!"); return 42 * seconds; } }
當程序啓動時,建立了一個任務並嘗試同步獲取任務結果。Result屬性的Get部分會使,當前線程等待直到該任務完成,並將異常傳播給當前線程。在這種狀況下,經過catch代碼塊能夠很容易地捕獲異常,可是該異常是一個被封裝的異常,叫作AggregateException。在本例中,它裏面包含一個異常,由於只有一個任務拋出了異常。能夠訪問InnerException屬性來獲得底層異常。
第二個例子與第一個很是類似,不一樣之處是使用GetAwaiter和GetResult方法來訪問任務結果。這種狀況下,無需封裝異常,由於TPL基礎設施會提取該異常。若是隻有一個底層,任務,那麼一次只能獲取一個原始異常,這種設計很是合適。
最後一個例子展現了兩個任務拋出異常的情形。如今使用後續操做來處理異常。只有以前,的任務完成前有異常時,該後續操做纔會被執行。經過給後續操做傳遞TaskContinuationOptions.OnlyOnFaulted選項能夠實現該行爲。結果打印出了AggregateException,其內部封裝了兩個任,務拋出的異常。
本節展現瞭如何同時運行多個異步任務。咱們將學習當全部任務都完成或任意一個任務,完成了工做時,如何高效地獲得通知。
class Program { static void Main(string[] args) { var firstTask = new Task<int>(() => TaskMethod("First Task", 3)); var secondTask = new Task<int>(() => TaskMethod("Second Task", 2)); var whenAllTask = Task.WhenAll(firstTask, secondTask); whenAllTask.ContinueWith(t => Console.WriteLine("The first answer is {0}, the second is {1}", t.Result[0], t.Result[1]), TaskContinuationOptions.OnlyOnRanToCompletion ); firstTask.Start(); secondTask.Start(); Thread.Sleep(TimeSpan.FromSeconds(4)); var tasks = new List<Task<int>>(); for (int i = 1; i < 4; i++) { int counter = i; var task = new Task<int>(() => TaskMethod(string.Format("Task {0}", counter), counter)); tasks.Add(task); task.Start(); } while (tasks.Count > 0) { var completedTask = Task.WhenAny(tasks).Result; tasks.Remove(completedTask); Console.WriteLine("A task has been completed with result {0}.", completedTask.Result); } Thread.Sleep(TimeSpan.FromSeconds(1)); Console.ReadKey(); } static int TaskMethod(string name, int seconds) { Console.WriteLine("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); Thread.Sleep(TimeSpan.FromSeconds(seconds)); return 42 * seconds; } }
當程序啓動時,建立了兩個任務。而後藉助於Task.WhenAll方法,建立了第三個任務,該任務將會在全部任務完成後運行。該任務的結果提供了一個結果數組,第一個元素是第.個任務的結果,第二個元素是第二個任務的結果,以此類推。
而後咱們建立了另一系列任務,並使用Task.WhenAny方法等待這些任務中的任何一 ,個完成。當有一個完成任務後,從列表中移除該任務並繼續等待其餘任務完成,直到列表爲, 4空。獲取任務的完成進展狀況或在運行任務時使用超時,均可以使用Task.WhenAny方法。例如,咱們等待一組任務運行,而且使用其中一個任務用來記錄是否超時。若是該任務先完,成,則只需取消掉其餘還未完成的任務。
一、新建一個C# WPF應用程序項目
二、在MainWindow.xaml文件中,將下面的標記代碼加入到一個網格元素中(即<Grid和<Grid>標籤間):
<TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top" Width="425" Height="40"/> <Button Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonSync_Click"/> <Button Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsync_Click"/> <Button Content="Async OK" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="ButtonAsyncOK_Click"/>
三、在MainWindow.xaml.cs文件中使用如下using指令;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Input;
四、在MainWindow構造函數下面加入如下代碼片斷:
void ButtonSync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; try { //string result = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()).Result; string result = TaskMethod().Result; ContentTextBlock.Text = result; } catch (Exception ex) { ContentTextBlock.Text = ex.InnerException.Message; } } void ButtonAsync_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(); task.ContinueWith(t => { ContentTextBlock.Text = t.Exception.InnerException.Message; Mouse.OverrideCursor = null; }, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.FromCurrentSynchronizationContext()); } void ButtonAsyncOK_Click(object sender, RoutedEventArgs e) { ContentTextBlock.Text = string.Empty; Mouse.OverrideCursor = Cursors.Wait; Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext()); task.ContinueWith(t => Mouse.OverrideCursor = null, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.FromCurrentSynchronizationContext()); } Task<string> TaskMethod() { return TaskMethod(TaskScheduler.Default); } Task<string> TaskMethod(TaskScheduler scheduler) { Task delay = Task.Delay(TimeSpan.FromSeconds(5)); return delay.ContinueWith(t => { string str = string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); ContentTextBlock.Text = str; return str; }, scheduler); }
本例中引人了不少新鮮的東西。首先,建立了一個WPF應用程序,而不是一個命令行,程序。這是頗有必要的,由於咱們須要一個擁有消息循環的用戶界面線程來演示異步運行任,務的不一樣情形。
TaskScheduler是一個很是重要的抽象。該組件實際上負責如何執行任務。默認的任務調度程序將任務放置到線程池的工做線程中。這是很是常見的場景,因此TPL將其做爲默認選項並不用奇怪。咱們已經知道了如何同步運行任務,以及如何將任務附加到父任務上從而一塊兒運行。如今讓咱們看看使用任務的其餘方式。
當程序啓動時,建立了一個包含三個按鈕的窗口。第一個按鈕調用了一個同步任務的執行。該代碼被放置在ButtonSync Click方法中。當任務運行時,咱們甚至沒法移動應用程序,窗口。當用戶界面線程忙於運行任務時,整個用戶界面被徹底凍結,在任務完成前沒法響應任何消息循環。對於GUI窗口程序來講這是一個至關很差的實踐,咱們須要找到一個方式來,解決該問題 ,
第二個問題是咱們嘗試從其餘線程訪問UI控制器。圖形用戶界面控制器從沒有被設計,爲可被多線程使用,而且爲了不可能的錯誤,不容許從建立UI的線程以外的線程中訪問, U1組件。當咱們嘗試這樣作時,獲得了一個異常,該異常信息5秒後打印到了主窗口中。
爲了解決第一個問題,咱們嘗試異步運行任務。第二個按鈕就是這樣作的。該代碼被,.放置在ButtonAsync Click方法中。當使用調試模式運行該任務時,將會看到該任務被放置,在線程池中,最後將獲得一樣的異常。然而,當任務運行時用戶界面一直保持響應。這是好事,可是咱們仍須要除掉異常。
其實咱們已經解決了該問題。給TaskScheduler.FromCurrentSynchronizationContext選項提供一個後續操做用於輸出錯誤信息。若是不這樣作,咱們將沒法看到錯誤信息,由於可能會獲得在任務中產生的相同異常。該選項驅使TPL基礎設施給U1線程的後續操做中放入代碼,並藉助UI線程消息循環來異步運行該代碼。這解決了從其餘線程訪問UI控制器並仍保持U1處於響應狀態的問題。
爲了檢查是否真的是這樣,能夠按下最後一個按鈕來運行ButtonAsyncOK-Click方法中的代碼。與其他例子不一樣之處在於咱們將UI線程任務調度程序提供給了該任務。你將看到 ,任務以異步的方式運行在UI線程中。U1依然保持響應。甚至儘管等待光標處於激活狀態,你仍能夠按下另外一個按鈕,
然而使用U1線程運行任務有一些技巧。若是回到同步任務代碼,取消對使用UI線程任務調度程序獲取結果的代碼行的註釋,咱們將永遠得不到任何結果。這是一個經典的死鎖情,況:咱們在UI線程隊列中調度了一個操做, U1線程等待該操做完成,但當等待時,它又沒法運行該操做,這將永不會結束(甚至永不會開始),若是在任務中調用Wait方法也會發生死鎖。爲了不死鎖,絕對不要經過任務調度程序在U1線程中使用同步操做,請使用C# 5.0中的ContinueWith或async/await方法。
到如今爲止,咱們學習了任務並行庫,這是微軟提供的最新的異步編程基礎設施。它容許咱們以模塊化的方式設計程序,來組合不一樣的異步操做。
遺憾的是,當閱讀此類程序時仍然很是難理解程序的實際執行順序。在大型程序中將會,.有許多相互依賴的任務和後續操做,用於運行其餘後續操做的後續操做,處理異常的後續操,做,而且它們都出如今程序代碼中不一樣的地方。所以瞭解程序的前後執行次序變成了一個極具挑戰性的問題。
另外一個須要關注的問題是,可以接觸用戶界面控制器的每一個異步任務是否獲得了正確的,同步上下文。程序只容許經過UI線程使用這些控制器,不然將會獲得多線程訪問異常。
說到異常,咱們不得不使用單獨的後續操做任務來處理在以前的異步操做中發生的錯誤。這又致使了分散在代碼的不一樣部分的複雜的處理錯誤的代碼,邏輯上沒法相互關聯。
爲了解決這些問題, C#5.0的做者引入了新的語言特性,稱爲異步函數(asynchronous function),它是TPL之上的更高級別的抽象,真正簡化了異步編程。正如在第4章提到的,抽象隱藏了主要的實現細節,使得程序員無須考慮許多重要的事情,從而使異步編程更容易。瞭解異步函數背後的概念是很是重要的,有助於咱們編寫健壯的高擴展性的應用程序。
要建立一個異步函數,首先須要用async關鍵字標註一個方法。若是不先作這個,就不可能擁有async屬性或事件訪問方法和構造函數。代碼以下所示:
另外一個重要的事實是,異步函數必須返回Task或Task<T>類型。能夠使用async void方法,可是更推薦使用async Task方法。使用async void方法惟一合理的地方是在程序中使,用頂層UI控制器事件處理器的時候。
使用async關鍵字標註的方法內部,能夠使用await操做符。該操做符可與TPL的任務,一塊兒工做,並獲取該任務中異步操做的結果。在本章中稍後會講述細節。在async方法外不能使用await關鍵字,不然會有編譯錯誤。另外,異步函數在其代碼中至少要擁有一個await操做符。然而,若是沒有隻會致使編譯警告,而不是編譯錯誤。
須要注意的是,在執行完await調用的代碼行後該方法會當即返回。若是是同步執行,執行線程將會阻塞兩秒而後返回結果。這裏當執行完await操做後,當即將工做線程,放回線程池的過程當中,咱們會異步等待。2秒後,咱們又一次從線程池中獲得工做線程並繼續運行其中剩餘的異步方法。這容許咱們在等待2秒時重用工做線程作些其餘事,這對提升應用程序的可伸縮性很是重要。藉助於異步函數咱們擁有了線性的程序控制流,但它,的執行依然是異步的。這雖然好用,可是難以理解。本章將幫助你學習異步函數全部重要的方面。
以個人自身經驗而言,若是程序中有兩個連續的await操做符,此時程序如何工做有一個常見的誤解。不少人認爲若是在另外一個異步操做以後使用await函數,它們將會並行運行。然而,事實上它們是順序運行的,即第一個完成後第二個纔會開始運行。記住這一點很重要,在本章中稍後會覆蓋該細節。
在C# 5.0中關聯async和await有必定的限制。例如,不能把控制檯程序的Main方法標,記爲async,不能在catch, finally, lock或unsafe代碼塊中使用await操做符。不容許對任何異步函數使用ref或out參數。還有其餘微妙的地方,可是以上已經包括了主要的須要注意的,地方。
異步函數會被C#編譯器在後臺編譯成複雜的程序結構。這裏我不會說明該細節。生,成的代碼與另外一個C#構造很相似,稱爲迭代器。生成的代碼被實現爲一種狀態機。儘管不少程序員幾乎開始爲每一個方法使用async修飾符,我仍是想強調若是方法原本無需異步 ,或並行運行,那麼將該方法標註爲async是沒有道理的。調用async方法會有顯著的性能。損失,一般的方法調用比使用async關鍵字的一樣的方法調用要快上40~50倍。請注意這一點。
在本章中咱們將學習如何使用C# 5.0中的async和await關鍵字實現異步操做。本章將講述如何使用await按順序或並行地執行異步操做,還將討論如何在lambda表達式中使,用await,如何處理異常,以及在使用async void方法時如何避免陷阱。在本章結束前,咱們會深刻探究同步上下文傳播機制並學習如何建立自定義的awaitable對象,從而無需使用任務。
.本節將講述使用異步函數的基本場景。咱們將比較使用TPL和使用await操做符獲取異步操做結果的不一樣之處。
class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { Task<string> t = GetInfoAsync("Task 1"); Task t2 = t.ContinueWith(task => Console.WriteLine(t.Result), TaskContinuationOptions.NotOnFaulted); Task t3 = t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted); return Task.WhenAny(t2, t3); } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Task 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { await Task.Delay(TimeSpan.FromSeconds(2)); //throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
當程序運行時運行了兩個異步操做。其中一個是標準的TPL模式的代碼,第二個使用了 C#的新特性async和awaito。AsynchronyWithTPL方法啓動了一個任務,運行兩秒後返回關於工做線程信息的字符串。而後咱們定義了一個後續操做,用於在異步操做完成後打印出該 "操做結果,還有另外一個後續操做,用於萬一有錯誤發生時打印出異常的細節。最終,返回了一個表明其中一個後續操做任務的任務,並等待其在Main函數中完成。
在AsynchronyWithAwait方法中,咱們對任務使用await並獲得了相同的結果。這和編寫一般的同步代碼的風格同樣,即咱們獲取任務的結果,打印出結果,若是任務完成時帶有 "錯誤則捕獲異常。關鍵不一樣的是這其實是一個異步程序。使用await後, C#當即建立了一 1個任務,其有一個後續操做任務,包含了await操做符後面的全部剩餘代碼。這個新任務也處理了異常傳播。而後,將該任務返回到主方法中並等待其完成
請注意根據底層異步操做的性質和當前異步的上下文,執行異步代碼的具體方式可能會不一樣。稍後在本章中會解釋這一點。
所以能夠看到程序的第一部分和第二部分在概念上是等同的,可是在第二部分中C# ,編譯器隱式地處理了異步代碼。事實上,第二部分比第一部分更復雜,接下來咱們將講述,細節。
請記住在Windows GUI或ASPNET之類的環境中不推薦使用Task.Wait和Task.Result方法。若是程序員不是百分百地清楚代碼在作什麼,極可能會致使死鎖。在第4章的4.10節中,在WPF應用程序中使用Task.Result時已經演示了該一點。
請取消對GetInfoAsync方法的throw new Exception代碼行的註釋來測試異常處理是否工做。
本節將展現如何在lambda表達式中使用await,咱們將編寫一個使用了await的匿名方法,而且獲取異步執行該方法的結果。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { Func<string, Task<string>> asyncLambda = async name => { await Task.Delay(TimeSpan.FromSeconds(2)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); }; string result = await asyncLambda("async lambda"); Console.WriteLine(result); } }
首先,因爲不能在Main方法中使用async,咱們將異步函數移到了Asynchronous Processing方法中。而後使用async關鍵字聲明瞭一個lambda表達式。因爲任何lambda表達式的類型都不能經過lambda自身來推斷,因此不得不顯式向C#編譯器指定它的類型。在本例中,該類型說明該lambda表達式接受一個字符串參數,並返回一個Task<string>對象。
接着,咱們定義了lambda表達式體。有個問題是該方法被定義爲返回一個Task<string>對象,但實際上返回的是字符串,卻沒有編譯錯誤!這是由於C#編譯器自動產生一個任務,並返回給咱們。
最後一步是等待異步lambda表達式執行並打印出結果。
本節將展現當代碼中有多個連續的await方法時程序的實際流程是怎樣的。咱們將學習如何閱讀有await方法的代碼,以及理解爲何await調用是異步操做。
class Program { static void Main(string[] args) { Task t = AsynchronyWithTPL(); t.Wait(); t = AsynchronyWithAwait(); t.Wait(); Console.ReadKey(); } static Task AsynchronyWithTPL() { var containerTask = new Task(() => { Task<string> t = GetInfoAsync("TPL 1"); t.ContinueWith(task => { Console.WriteLine(t.Result); Task<string> t2 = GetInfoAsync("TPL 2"); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Result), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t2.ContinueWith(innerTask => Console.WriteLine(innerTask.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); t.ContinueWith(task => Console.WriteLine(t.Exception.InnerException), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent); }); containerTask.Start(); return containerTask; } async static Task AsynchronyWithAwait() { try { string result = await GetInfoAsync("Async 1"); Console.WriteLine(result); result = await GetInfoAsync("Async 2"); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine(ex); } } async static Task<string> GetInfoAsync(string name) { Console.WriteLine("Task {0} started!", name); await Task.Delay(TimeSpan.FromSeconds(2)); if(name == "TPL 2") throw new Exception("Boom!"); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
當程序運行時,與上節同樣運行了兩個異步操做。然而此次從AsynchronyWithAwait方法講起。它看起來仍然像日常的同步代碼,惟一不一樣之處是使用了兩個await聲明。最重要的一點是該代碼依然是順序執行的, Async2任務只有等以前的任務完成後纔會開始執行。當閱讀該代碼時,程序流很清晰,能夠看到什麼先運行,什麼後運行。但該程序如何是異步程序呢?首先,它不老是異步的。當使用await時若是一個任務已經完成,咱們會異步地獲得該任務結果。不然,當在代碼中看到await聲明時,一般的行爲是方法執行到該await代碼行時將當即返回,而且剩下的代碼將會在一個後續操做任務中運行。所以等待操做結果時並無阻塞程序執行,這是一個異步調用。當AsynchronyWithAwait方法中的代碼在執行時,除了在Main方法中調用t.Wait外,咱們能夠執行任何其餘任務。然而, "主線程必須等待直到全部異步操做完成,不然主線程完成後全部運行異步操做的後臺線程! ",會中止運行。
AsynchronyWithTPL方法模仿了AsynchronyWithAwait的程序流。咱們須要一個容器任務來處理全部相互依賴的任務。而後啓動主任務,給其加了一組後續操做。當該任務完成後,會打印出其結果。而後又啓動了一個任務,在該任務完成後會依次運行更多的後續操"做。爲了測試對異常的處理,當運行第二個任務時故意拋出一個異常,並打印出異常信息。這組後續操做建立了與第一個方法中同樣的程序流。若是用它與await方法比較,能夠看到它更容易閱讀和理解。惟一的技巧是請記住異步並不老是意味着並行執行。
本節將學習如何使用await來並行地運行異步任務,而不是採用經常使用的順序執行。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 5); string[] results = await Task.WhenAll(t1, t2); foreach (string result in results) { Console.WriteLine(result); } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); //await Task.Run(() => Thread.Sleep(TimeSpan.FromSeconds(seconds))); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
這裏定義了兩個異步任務,分別運行3秒和5秒。而後使用Task.WhenAll輔助方法創!建了另外一個任務,該任務只有在全部底層任務完成後纔會運行。以後咱們等待該組合任務的,結果。5秒後,咱們獲取了全部結果,說明了這些任務是同時運行的。
然而這裏觀察到一個有意思的現象。當運行該程序時,你可能注意到這兩個任務似平是,被線程池中的同一個工做線程執行的。當咱們並行運行任務時怎麼可能發生這樣的事情呢?爲了讓事情更有趣,咱們來註釋掉GetIntroAsync方法中的await Task.Delay代碼行,並解除,對await Task.Run代碼行的註釋,而後再次運行程序。
咱們會看到該狀況下兩個任務會被不一樣的工做線程執行。不一樣之處是Task.Delay在幕後使用了一個計時器,過程以下:從線程池中獲取工做線程,它將等待Task.Delay方法返回結,果。而後, Task.Delay方法啓動計時器並指定一塊代碼,該代碼會在計時器時間到了Task.Delay方法中指定的秒數後被調用。以後當即將工做線程返回到線程池中。當計時器事件運,行時,咱們又從線程池中任意獲取一個可用的工做線程(可能就是運行一個任務時使用的線,程)並運行計時器提供給它的代碼。
當使用Task.Run方法時,從線程池中獲取了一個工做線程並將其阻塞幾秒,具體秒數,由Thread.Sleep方法提供。而後獲取了第二個工做線程而且也將其阻塞。在這種場景下.咱們消費了兩個工做線程,而它們絕對什麼事沒作,由於在它們等待時不能執行任何其餘,操做。
咱們將在第9章中討論第一個場景的細節。在第9章咱們將討論用大量的異步操做進行,數據輸入和輸出。儘量地使用第一種方式是建立高伸縮性的服務器程序的關鍵。
本節將描述在C#中使用異步函數時如何處理異常。咱們將學習對多個並行的異步操做,使用await時如何聚合異常。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); } async static Task AsynchronousProcessing() { Console.WriteLine("1. Single exception"); try { string result = await GetInfoAsync("Task 1", 2); Console.WriteLine(result); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions"); Task<string> t1 = GetInfoAsync("Task 1", 3); Task<string> t2 = GetInfoAsync("Task 2", 2); try { string[] results = await Task.WhenAll(t1, t2); Console.WriteLine(results.Length); } catch (Exception ex) { Console.WriteLine("Exception details: {0}", ex); } Console.WriteLine(); Console.WriteLine("2. Multiple exceptions with AggregateException"); t1 = GetInfoAsync("Task 1", 3); t2 = GetInfoAsync("Task 2", 2); Task<string[]> t3 = Task.WhenAll(t1, t2); try { string[] results = await t3; Console.WriteLine(results.Length); } catch { var ae = t3.Exception.Flatten(); var exceptions = ae.InnerExceptions; Console.WriteLine("Exceptions caught: {0}", exceptions.Count); foreach (var e in exceptions) { Console.WriteLine("Exception details: {0}", e); Console.WriteLine(); } } Console.ReadKey(); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); throw new Exception(string.Format("Boom from {0}!", name)); } }
咱們運行了三個場景來展現在C#中使用async和await時關於錯誤處理的最多見狀況。第一種狀況是最簡單的,而且與常見的同步代碼幾乎徹底同樣。咱們只使用try/catch聲明即 ,可獲取異常細節。
一個很常見的錯誤是對一個以上的異步操做使用await時還使用以上方式。若是仍像第一種狀況同樣使用catch代碼塊,則只能從底層的AggregateException對象中獲得第一個異常。
爲了收集全部異常信息,能夠使用await任務的Exception屬性。在第三種狀況中,咱們使用AggregateException的Flatten方法將層級異常放入一個列表,而且從中提取出全部的底層異常。
本節描述了當使用await來獲取異步操做結果時,同步上下文行爲的細節。咱們將學習,如何以及什麼時候關閉同步上下文流。
加入對Windows Presentation Foundation庫的引用。
(1)右鍵點擊項目中的引用文件夾,選擇添加引用菜單選項。
(2)添加對PresentationCore, PresentationFramework, System.Xaml及Windows.Base庫的引用。
class Program { [STAThread] static void Main(string[] args) { var app = new Application(); var win = new Window(); var panel = new StackPanel(); var button = new Button(); _label = new Label(); _label.FontSize = 32; _label.Height = 200; button.Height = 100; button.FontSize = 32; button.Content = new TextBlock { Text = "Start asynchronous operations" }; button.Click += Click; panel.Children.Add(_label); panel.Children.Add(button); win.Content = panel; app.Run(win); Console.ReadLine(); } async static void Click(object sender, EventArgs e) { _label.Content = new TextBlock { Text = "Calculating..." }; TimeSpan resultWithContext = await Test(); TimeSpan resultNoContext = await TestNoContext(); //TimeSpan resultNoContext = await TestNoContext().ConfigureAwait(false); var sb = new StringBuilder(); sb.AppendLine(string.Format("With the context: {0}", resultWithContext)); sb.AppendLine(string.Format("Without the context: {0}", resultNoContext)); sb.AppendLine(string.Format("Ratio: {0:0.00}", resultWithContext.TotalMilliseconds / resultNoContext.TotalMilliseconds)); _label.Content = new TextBlock { Text = sb.ToString() }; } async static Task<TimeSpan> Test() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t; } sw.Stop(); return sw.Elapsed; } async static Task<TimeSpan> TestNoContext() { const int iterationsNumber = 100000; var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < iterationsNumber; i++) { var t = Task.Run(() => { }); await t.ConfigureAwait( continueOnCapturedContext: false); } sw.Stop(); return sw.Elapsed; } private static Label _label; }
在本例中,咱們將學習異步函數默認行爲的最重要的方面之一。咱們已經從第4章中瞭解了任務調度程序和同步上下文。默認狀況下, await操做符會嘗試捕獲同步上下文,並在其中執行代碼。咱們已經知道這有助於咱們編寫與用戶界面控制器協做的異步代碼。另外,使用await不會發生在以前章節中描述過的死鎖狀況,由於當等待結果時並不會阻塞UI線程。
這是合理的,可是讓咱們看看潛在會發生什麼事。在本例中,咱們使用編程方式建立了·一個Windows Presentation Foundation應用程序並訂閱了它的按鈕點擊事件。當點擊該按鈕!時,運行了兩個異步操做。其中一個使用了一個常規的await操做符,另外一個使用了帶false參數值的ConfigureAwait方法。false參數明確指出咱們不能對其使用捕獲的同步上下文來運行後續操做代碼。在每一個操做中,咱們測量了執行完成花費的時間,而後將各自的時間和比例顯示在主屏幕上。
結果看到常規的await操做符花費了更多的時間來完成。這是由於咱們向UI線程中放,入了成百上千個後續操做任務,這會使用它的消息循環來異步地執行這些任務。在本例中,咱們無需在UI線程中運行該代碼,由於異步操做並未訪問UI組件。使用帶false參數值的, ConfigureAwait方法是一個更高效的方案。
還有一件事值得一提。嘗試運行程序並只點擊按鈕而後等待結果,而後再這樣作一次,可是此次點擊按鈕後嘗試隨機地拖拽應用程序窗口從一側到另外一側。你將注意到在捕獲的同步上下文中的代碼執行速度變慢了!這個有趣的反作用完美演示了異步編程是多麼危險。經歷相似的狀況是很是容易的,並且若是你以前從未經歷過這樣的狀況,那麼幾乎不可能經過,調試來找出問題所在。
公平起見,讓咱們來看看相反的狀況。在前面的代碼片斷中,在Click方法中,取消注,釋的代碼行,並註釋掉緊挨着它的前一行代碼。當運行程序時,咱們將獲得多線程控制訪問異常,由於設置Label控制器文本的代碼不會放置到捕捉的上下文中,而是在線程池的工做,線程中執行。
本節描述了爲何使用async void方法很是危險。咱們將學習以及如何儘量地替代該方法。在哪一種狀況下可以使用該方,
class Program { static void Main(string[] args) { Task t = AsyncTask(); t.Wait(); AsyncVoid(); Thread.Sleep(TimeSpan.FromSeconds(3)); t = AsyncTaskWithErrors(); while(!t.IsFaulted) { Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(t.Exception); //try //{ // AsyncVoidWithErrors(); // Thread.Sleep(TimeSpan.FromSeconds(3)); //} //catch (Exception ex) //{ // Console.WriteLine(ex); //} int[] numbers = new[] {1, 2, 3, 4, 5}; Array.ForEach(numbers, async number => { await Task.Delay(TimeSpan.FromSeconds(1)); if (number == 3) throw new Exception("Boom!"); Console.WriteLine(number); }); Console.ReadLine(); } async static Task AsyncTaskWithErrors() { string result = await GetInfoAsync("AsyncTaskException", 2); Console.WriteLine(result); } async static void AsyncVoidWithErrors() { string result = await GetInfoAsync("AsyncVoidException", 2); Console.WriteLine(result); } async static Task AsyncTask() { string result = await GetInfoAsync("AsyncTask", 2); Console.WriteLine(result); } private static async void AsyncVoid() { string result = await GetInfoAsync("AsyncVoid", 2); Console.WriteLine(result); } async static Task<string> GetInfoAsync(string name, int seconds) { await Task.Delay(TimeSpan.FromSeconds(seconds)); if(name.Contains("Exception")) throw new Exception(string.Format("Boom from {0}!", name)); return string.Format("Task {0} is running on a thread id {1}. Is thread pool thread: {2}", name, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } }
當程序啓動時,咱們經過調用AsyncTask和AsyncVoid這兩個方法啓動了兩個異步操做。第一個方法返回一個Task對象,而另外一個因爲被聲明爲async void因此沒有返回值。因爲它們都是異步的因此都會當即返回。可是第一個方法經過返回的任務狀態或對其調用, Wait方法從而很容易實現監控。等待第二個方法完成的惟一方式是確切地等待多長時間,由於咱們沒有聲明任何對象能夠監控該異步操做的狀態。固然能夠使用某種共享的狀態變量,將其設置到async void方法中,並從調用方法中檢查其值,但返回一個Task對象的方式更好些。
最危險的部分是異常處理。使用async void方法,異常處理方法將被放置到當前的同步上下文中,在本例中即線程池中。線程池中未被處理的異常會終結整個進程。使用 AppDomain.UnhandledException事件能夠攔截未被處理的異常,但不能從攔截的地方恢復進程。爲了重現該場景,能夠取消Main方法中對try/catch代碼塊的註釋,而後運行,程序,
關於使用async void lambda表達式的另外一個事實是:它們與Action類型是兼容的,而 Action類型在標準.NET Framework類庫中的使用很是普遍。在lambda表達式中很容易忘記對異常的處理,這將再次致使程序崩潰。能夠取消在Main方法中第二個被註釋的代碼塊的,註釋來重現該場景。
強烈建議只在UI事件處理器中使用async void方法。在其餘全部的狀況下,請使用返,回Task的方法。
本節將展現如何設計一個與await操做符兼容的很是基礎的awaitable類型。
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { var sync = new CustomAwaitable(true); string result = await sync; Console.WriteLine(result); var async = new CustomAwaitable(false); result = await async; Console.WriteLine(result); } class CustomAwaitable { public CustomAwaitable(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public CustomAwaiter GetAwaiter() { return new CustomAwaiter(_completeSynchronously); } private readonly bool _completeSynchronously; } class CustomAwaiter : INotifyCompletion { private string _result = "Completed synchronously"; private readonly bool _completeSynchronously; public bool IsCompleted { get { return _completeSynchronously; } } public CustomAwaiter(bool completeSynchronously) { _completeSynchronously = completeSynchronously; } public string GetResult() { return _result; } public void OnCompleted(Action continuation) { ThreadPool.QueueUserWorkItem( state => { Thread.Sleep(TimeSpan.FromSeconds(1)); _result = GetInfo(); if (continuation != null) continuation(); }); } private string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } }
爲了與await操做符保持兼容,類型應當遵照在C#5.0規格說明中的規定的一些要,求。若是你安裝了Visual Studio 2012,那麼能夠在C:Program FilesMicrosoft Visual Studio11.0VC#Specifications\1033 (假設你使用的是默認安裝路徑)目錄中找到該規格說明文檔。
在規格說明文檔的7.7.7.1節,咱們發現了awaitable表達式的定義:
Await表達式的任務被要求是awaitable,若是一個表達式t知足下面任意一條則認爲是, awaitable的:
這些信息足夠咱們開始了。首先咱們定義一個awaitable類型CustomAwaitable,並實現GetAwaiter方法,該方法返回一個CustomAwaiter類型的實例。CustomAwaiter實現了 .INotifyCompletion接口,擁有類型爲bool的IsCompleted屬性,而且有GetResult方法,該方法返回一個字符串類型。最後,咱們寫了一些代碼來建立兩個CustomAwaitable對象並對,其使用await關鍵字。
如今咱們應該理解await表達式執行的方式了。這裏並無引用規格說明文檔,以避免陷入沒必要要的細節。基本上,若是IsCompleted屬性返回true,則只需同步調用GetResult方法。這種作法防止了該操做已經完成後咱們仍然爲執行異步任務而分配資源。經過給 CustomAwaitable對象的構造函數傳遞completeSynchronously參數來展現該場景。
另外,咱們給CustomAwaiter的OnCompleted方法註冊了一個回調函數並啓動該異步操做。當操做完成時,就會調用提供的回調函數,該回調函數將會經過調用CustomAwaiter對象的GetResult方法來獲取結果。
本節展現瞭如何設計一個很是基本的類型,該類型可以與await操做符和動態C#類型兼容。
請執行如下步驟來添加對Impromptulnterface NuGet包的引用:
(1)右鍵點擊項目中的引用文件夾,並選擇管理NuGet包 菜單選項。
(2)添加對你喜歡的Impromptulnterface NuGet包的引用。能夠使用管理NuGet包對話框的搜索功能
class Program { static void Main(string[] args) { Task t = AsynchronousProcessing(); t.Wait(); Console.ReadKey(); } async static Task AsynchronousProcessing() { string result = await GetDynamicAwaitableObject(true); Console.WriteLine(result); result = await GetDynamicAwaitableObject(false); Console.WriteLine(result); } static dynamic GetDynamicAwaitableObject(bool completeSynchronously) { dynamic result = new ExpandoObject(); dynamic awaiter = new ExpandoObject(); awaiter.Message = "Completed synchronously"; awaiter.IsCompleted = completeSynchronously; awaiter.GetResult = (Func<string>)(() => awaiter.Message); awaiter.OnCompleted = (Action<Action>) ( callback => ThreadPool.QueueUserWorkItem(state => { Thread.Sleep(TimeSpan.FromSeconds(1)); awaiter.Message = GetInfo(); if (callback != null) callback(); }) ); IAwaiter<string> proxy = Impromptu.ActLike(awaiter); result.GetAwaiter = (Func<dynamic>) ( () => proxy ); return result; } static string GetInfo() { return string.Format("Task is running on a thread id {0}. Is thread pool thread: {1}", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread); } } public interface IAwaiter<T> : INotifyCompletion { bool IsCompleted { get; } T GetResult(); }
這裏咱們重複了5.9節的技巧,可是此次藉助於動態表達式,能夠使用NuGet來實現該目標。NuGet是一個包含了不少有用的庫的包管理器。此次咱們將使用一個庫來動態地建立,封裝對象,實現咱們須要的接口。
首先咱們建立了ExpandoObject類型的兩個實例,並把它們分配給動態的局部變量。這些變量將成爲awaitable和awaiter對象。因爲一個awaitable對象只須要擁有GetAwaiter方,法,提供該方法沒有問題。使用dynamic關鍵字組合ExpandoOibect容許咱們自定義該對象,並經過分配相應的值來添加屬性和方法。事實上它是一個字典類型的集合,鍵類型是string,值類型是object,若是你很熟悉JavaScript編程語言,你可能會注意到它與JavaScript對象很類似。
因爲dynamic關鍵字容許咱們跳過C#的編譯時檢查。ExpandObject是以這樣的方式編,寫的:當你給屬性分配值時, ExpandObject建立了一個字典條目,鍵是屬性名,值是賦予的任何值。當嘗試獲取屬性值時,會在字典中查找並提供存儲在相應的字典條目中的值。若是該值是Action或Func類型,咱們實際上存儲了一個委託,它能夠當作方法使用。所以, ExpandoObject與dynamic類型的組合容許咱們建立一個對象並動態地賦予其屬性和方法。
如今咱們須要構造自定義的awaiter和awaitable對象。先從awaiter開始。首先提供一個名爲Message的屬性並賦予初始值,而後使用Func<string>類型定義了GetResult方法.並分配一個lambda表達式,該表達式返回Message屬性值。接下來實現IsCompleted屬性。若是其值爲true,則跳過剩下的工做並處理存儲在result局部變量中的awaitable對象。咱們只須要添加一個方法用於返回該dynamic對象並從該對象返回awaiter對象。咱們能夠使用 result做爲await表達式。然而,它將會同步運行。
主要的挑戰是在動態對象中實現異步處理。C#語言規格說明規定awaiter必須實現, INotifyCompletion或ICriticalNotifyCompletion接口,可是ExpandoObject卻沒有。甚至當咱們動態地實現OnCompleted方法並添加到awaiter對象時,這仍然行不通,由於該對象沒有,實現上面提到的任何一個接口。
爲了解決該問題,咱們使用了NuGet提供的Impromptulnterface庫。它容許咱們使用 Impromptu.ActLike方法來動態地建立代理對象,該對象將實現任何須要的接口。若是咱們嘗試建立一個實現了INotifyCompletion接口的代理,仍然行不通,由於該代理對象再也不是動態的,而且該接口只有OnCompleted方法,但沒有IsCompleted屬性或GetResult方法。做爲最後的解決辦法,咱們定義了一個泛型接口, IAwaiter<T>,它實現了INotifyCompletion並添加了全部須要的屬性和方法。如今,咱們使用它生成代理並修改result對象來從GetAwaiter方法返回一個代理,而不是返回awaiter對象。如今程序能夠工做了,咱們構造了一個在運行時徹底動態的awaitable對象。
編程須要對基本的數據結構和算法有所瞭解。程序員爲併發狀況選擇最合適的數據結構,那就須要知道不少事情,例如算法運行時間、空間複雜度,以及大寫0標記法等。在不一樣的廣爲人知的場景中,咱們總知道哪一種數據結構更高效。
對於並行計算,咱們須要使用適當的數據結構。這些數據結構具有可伸縮性,儘量地, "避免鎖,同時還能提供線程安全的訪問。.NET framework版本4引入了System.Collections.Concurrent命名空間,其中包含了一些數據結構。在本章中,咱們將展現這些數據結構並經過簡單的例子來講明如何使用它們。
先從ConcurrentQueue開始。該集合使用了原子的比較和交換(Compare and Swap,簡稱CAS)操做,以及SpinWait來保證線程安全。它實現了一個先進先出( First In FirstOut,簡稱FIFO)的集合,這意味着元素出隊列的順序與加入隊列的順序是一致的。能夠調用Enqueue方法向隊列中加入元素。TryDequeue方法試圖取出隊列中的第一個元素,而 TryPeek方法則試圖獲得第一個元素但並不從隊列中刪除該元素。
ConcurrentStack的實現也沒有使用任何鎖,只採用了CAS操做。它是一個後進先出, (Last In First Out,簡稱LIFO)的集合,這意味着最近添加的元素會先返回。能夠使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法獲取元素,以及使用TryPeek方法檢查元素。
ConcurrentBag是一個支持重複元素的無序集合。它針對這樣如下狀況進行了優化,即多個線程以這樣的方式工做:每一個線程產生和消費本身的任務,極少與其餘線程的任務交互 (若是要交互則使用鎖),添加元素使用Add方法,檢查元素使用TryPeek方法,獲取元素使,用TryTake方法。
請避免使用上面說起的集合的Count屬性。實現這些集合使用的是鏈表, Count操做的時間複雜度爲0(N)。若是想檢查集合是否爲空,請使用IsEmpty屬性,其時間複雜度爲0(1),
ConcurrentDictionary是一個線程安全的字典集合的實現。對於讀操做無需使用鎖。可是對於寫操做則須要鎖。該併發字典使用多個鎖,在字典桶之上實現了一個細粒度的鎖模型。使用參數concurrencyLevel能夠在構造函數中定義鎖的數量,這意味着預估的線程數量將併發地更新該字典。
因爲併發字典使用鎖,因此一些操做須要獲取該字典中的全部鎖。若是不必請避免使用如下操做: Count, IsEmpty, Keys, Values, CopyTo及ToArray。
BlockingCollection是對IProducerConsumerCollection泛型接口的實現的一個高級封裝。它有不少先進的功能來實現管道場景,即當你有一些步驟須要使用以前步驟運行的結果時。BlockingCollectione類支持以下功能:分塊、調整內部集合容量、取消集合操做、從多個塊集合中獲取元素。
本節展現了一個很是簡單的場景,比較在單線程環境中使用一般的字典集合與使用併發字典的性能。
class Program { static void Main(string[] args) { var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDictionary[i] = Item; } sw.Stop(); Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed); Console.ReadKey(); } const string Item = "Dictionary item"; public static string CurrentItem; }
當程序啓動時咱們建立了兩個集合,其中一個是標準的字典集合,另外一個是新的併發字典集合。而後採用鎖的機制向標準的字典中添加元素,並測量完成100萬次迭代的時間。一樣也採用一樣的場景來測量ConcurrentDictionary的性能,最後比較從兩個集合中獲取值的性能。
經過這個很是簡單的場景,咱們發現ConcurrentDictionary寫操做比使用鎖的一般的字典要慢得多,而讀操做則要快些。所以若是對字典須要大量的線程安全的讀操做, ConcurrentDictionary是最好的選擇。
若是你對字典只須要多線程訪問只讀元素,則不必執行線程安全的讀操做。在此場景中最好只使用一般的字典或ReadOnlyDictionary集合。
ConcurrentDictionary的實現使用了細粒度鎖( fine-grained locking)技術,這在多線程寫入方面比使用鎖的一般的字典(也被稱爲粗粒度鎖)的可伸縮性更好。正如本例中所示,當只用一個線程時,併發字典很是慢,可是擴展到5到6個線程(若是有足夠的CPU核心來同時運行它們),併發字典的性能會更好。
本節將展現建立能被多個工做者異步處理的一組任務的例子
class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i-1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask {Id = i}; queue.Enqueue(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
當程序運行時,咱們使用ConcurrentQueue集合實例建立了一個任務隊列。而後建立了一個取消標誌,它是用來在咱們將任務放入隊列後中止工做的。接下來啓動了一個單獨的工,做線程來將任務放入任務隊列中。該部分爲異步處理產生了工做量。
如今定義該程序中消費任務的部分。咱們建立了四個工做者,它們會隨機等待一段時,間,而後從任務隊列中獲取一個任務,處理該任務,一直重複整個過程直到咱們發出取消標誌信號。最後,咱們啓動產生任務的線程,等待該線程完成。而後使用取消標誌給消費者發信號咱們完成了工做。最後一步將等待全部的消費者完成。
咱們看到隊列中的任務按從前到後的順序被處理,但一個後面的任務是有可能會比前面的任務先處理的,由於咱們有四個工做者獨立地運行,並且任務處理時間並非恆定的。我,們看到訪問該隊列是線程安全的,沒有一個元素會被提取兩次。
.本節是前一小節的細微修改版。咱們又一次建立了被多個工做者異步處理的一組任務,可是此次使用ConcurrentStack來實現並看看有什麼不一樣。
class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); } static async Task RunProgram() { var taskStack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentStack<CustomTask> stack) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; stack.Push(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentStack<CustomTask> stack, string name, CancellationToken token) { await GetRandomDelay(); do { CustomTask workItem; bool popSuccesful = stack.TryPop(out workItem); if (popSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
當程序運行時,咱們建立了一個ConcurrentStack集合的實侈e其他的代碼與前一小節中幾乎同樣,惟一不一樣之處是咱們對併發堆棧使用Push和TryPop方法,而對併發隊列使用Enqueue和TryDequeue方法。
如今能夠看到任務處理的順序被改變了。堆棧是一個LIFO集合,工做者先處理最近的,任務。在併發隊列中,任務被處理的順序與被添加的順序幾乎一致。這意味着根據工做者的!數量,咱們必將在必定時間窗內處理先被建立的任務。而在堆棧中,早先建立的任務具備較低的優先級,並且直到生產者中止向堆棧中放入更多任務後,該任務纔有可能被處理。這種行爲是肯定的,最好在該場景下使用隊列。
本節展現了在多個獨立的既可生產工做又可消費工做的工做者間如何擴展工做量。
class Program { static void Main(string[] args) { CreateLinks(); Task t = RunProgram(); t.Wait(); } static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); static async Task RunProgram() { var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"}; var crawlers = new Task[4]; for (int i = 1; i <= 4; i++) { string crawlerName = "Crawler " + i.ToString(); bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"}); crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName)); } await Task.WhenAll(crawlers); Console.ReadKey(); } static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName) { CrawlingTask task; while (bag.TryTake(out task)) { IEnumerable<string> urls = await GetLinksFromContent(task); if (urls != null) { foreach (var url in urls) { var t = new CrawlingTask { UrlToCrawl = url, ProducerName = crawlerName }; bag.Add(t); } } Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!", task.UrlToCrawl, task.ProducerName, crawlerName); } } static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task) { await GetRandomDelay(); if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl]; return null; } static void CreateLinks() { _contentEmulation["http://microsoft.com/"] = new [] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new [] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(150, 200); return Task.Delay(delay); } class CrawlingTask { public string UrlToCrawl { get; set; } public string ProducerName { get; set; } } }
該程序模擬了使用多個網絡爬蟲進行網頁索引的場景。網絡爬蟲是這樣一個程序:它使用網頁地址打開一個網頁,索引該網頁內容,嘗試訪問該頁面包含的全部連接,而且也索引這些連接頁面。剛開始,咱們定義了一個包含不一樣網頁URL的字典。該字典模擬了包含其,他頁面連接的網頁。該實現很是簡單,並不關心索引已經訪問過的頁面,但正由於它如此簡單咱們才能夠關注並行工做負載。
接着建立了一個併發包,其中包含爬蟲任務。咱們建立了四個爬蟲,而且給每一個爬蟲都提供了一個不一樣的網站根URL,而後等待全部爬蟲完成工做。如今每一個爬蟲開始檢索提供給,它的網站URL,咱們經過等待一個隨機事件來模擬網絡10處理。若是頁面包含的URL越多,爬蟲向包中放入的任務也會越多。而後檢查包中是否還有任何須要爬蟲處理的任務,若是沒有說明爬蟲完成了工做。
若是檢查前四個根URL後的第一行輸出內容,咱們將看到被爬蟲N放置的任務一般會,被同一個爬蟲處理。然而,接下來的行則會不一樣。這是由於ConcurrentBag內部針對多個線程既能夠添加元素又能夠刪除元素的場景進行了優化。實現方式是每一個線程使用本身的本地,隊列的元素,因此使用該隊列時無需任何鎖。只有當本地隊列中沒有任何元素時,咱們才執,行一些鎖定操做並嘗試從其餘線程的本地隊列中「偷取」工做。這種行爲有助於在全部工做,者間分發工做並避免使用鎖。
本節將描述如何使用BlockingCollection來簡化實現異步處理的工做負載。
class Program { static void Main(string[] args) { Console.WriteLine("Using a Queue inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(); t.Wait(); Console.WriteLine(); Console.WriteLine("Using a Stack inside of BlockingCollection"); Console.WriteLine(); t = RunProgram(new ConcurrentStack<CustomTask>()); t.Wait(); } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if(collection != null) taskCollection= new BlockingCollection<CustomTask>(collection); var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run( () => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); } static async Task TaskProcessor( BlockingCollection<CustomTask> collection, string name) { await GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await GetRandomDelay(); } } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } }
先說第一個場景,這裏咱們使用了BlockingCollection類,它帶來了不少優點。首先,咱們可以改變任務存儲在阻塞集合中的方式。默認狀況下它使用的是ConcurrentQueue容器,可是咱們可以使用任何實現了IProducerConsumerCollection泛型接口的集合。爲了演示該點,咱們運行了該程序兩次,第二次時使用ConcurrentStack做爲底層集合。
工做者經過對阻塞集合迭代調用GetConsumingEnumerable方法來獲取工做項。若是在該集合中沒有任何元素,迭代器會阻塞工做線程直到有元素被放置到集合中。當生產者調用集合的CompleteAdding時該迭代週期會結束。這標誌着工做完成了。
這裏很容易犯一個錯誤,即對BlockingCollection進行迭代,由於它自身實現了IEnumerable接口。不要忘記使用GetConsumingEnumerable,不然你迭代的只是集合的「快照」,這並非指望的程序行爲。
工做量生產者將任務插入到BlockingCollection而後調用CompleteAdding方法,這會使全部工做者完成工做。如今在程序輸出中咱們看到兩個結果序列,演示了併發隊列和堆棧集合的不一樣之處。
NET Framework庫中有個子集叫作並行庫,一般被稱爲並行框架擴展( Parallel Framework Extensions,簡稱PFX),這是這些庫很是早期的版本的名稱。並行庫隨着.NET Framework 4.0一塊兒發佈,包含三大主要部分:
事實上咱們將 "程序分割成一組任務並使用不一樣的線程來運行不一樣的任務。這種方式被稱爲任務並行( task parallelism), 目前咱們只學習了任務並行。.
想象一下咱們有一個程序針對一組大數據進行重量級運算。並行運行該程最容易的方式,是將該組數據分割成較小的數據塊,對這些數據塊進行並行計算,而後聚合這些計算結果。這種編程模型稱爲數據並行(data parallelism)。
任務並行是最底層的抽象層。咱們將程序定義爲任務的組合,顯式地定義這些任務如何組合。由此方式組成的程序會很是複雜和細節化。並行操做被定義在該程序的不一樣位置,隨着並行操做的增加,程序變得愈來愈難理解和維護。採用這種方式來並行程序被稱爲無結構的並行(unstructured parallelism),這就是咱們爲複雜的並行邏輯付出的代價。
然而,當咱們有較簡單的程序邏輯時,咱們能夠將更多的並行細節推給PFX庫和C#編譯器。例如,咱們能夠說, 「我想以並行方式運行這三個方法,但我不關心是如何實現並行的,讓NET基礎設施決定細節。」這產生了一個抽象層使得咱們不用提供一個關於如何實現並行的細節描述。這種方式被稱爲結構並行( structured parallelism),由於並行一般是一組聲明,而且在程序中每一個並行狀況並定義在確切的地方。
這可能致使一種印象,即無結構並行是一種很差的實踐,應該始終使用結構並行替代它。我想強調這一點是不對的。結構並行確實更易維護,應該儘量地使用,可是它並非萬能的。一般有不少狀況咱們不能簡單地使用結構並行,那麼以非結構化的方式使用TPL任務並行也是徹底能夠的。
任務並行庫中有一個名爲Parallel的類,其提供了一組API用來實現結構並行。它仍然是TPL的一部分,咱們在本章介紹它的緣由是它是從較低的抽象層向較高的抽象層過渡的完美例子。當使用Parallel類的API時,咱們無需提供分割工做的細節。可是咱們仍要顯式定義如何從分割的結果中獲得單個結果。
PLINQ具備最高級抽象。它自動將數據分割爲數據塊,而且決定是否真的須要並行化查詢,或者使用一般的順序查詢處理更高效。PLINO基礎設施會將分割任務的執行結果組合到一塊兒。有不少選項可供程序員來優化查詢,使用盡量高的性能獲取結果。
在本章中咱們將涵蓋Parallel類的用法以及不少不一樣的PLINQ選項,例如讓LINQ查詢並行化,設置異常模型及設置PLINQ查詢的並行等級,處理查詢項的順序,以及處理, PLINQ異常。咱們也會學習如何管理PLINO查詢的數據分割。
本節展現瞭如何使用Parallel類的API,咱們將學習如何並行地調用方法,如何執行並, "行的循環,以及調整並行機制。
class Program { static void Main(string[] args) { Parallel.Invoke( () => EmulateProcessing("Task1"), () => EmulateProcessing("Task2"), () => EmulateProcessing("Task3") ); var cts = new CancellationTokenSource(); var result = Parallel.ForEach( Enumerable.Range(1, 30), new ParallelOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = Environment.ProcessorCount, TaskScheduler = TaskScheduler.Default }, (i, state) => { Console.WriteLine(i); if (i == 20) { state.Break(); Console.WriteLine("Loop is stopped: {0}", state.IsStopped); } }); Console.WriteLine("---"); Console.WriteLine("IsCompleted: {0}", result.IsCompleted); Console.WriteLine("Lowest break iteration: {0}", result.LowestBreakIteration); Console.ReadKey(); } static string EmulateProcessing(string taskName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250, 350))); Console.WriteLine("{0} task was processed on a thread id {1}", taskName, Thread.CurrentThread.ManagedThreadId); return taskName; } }
該程序演示了Parallel類的不一樣功能。與在任務並行庫中定義任務的方式相比,調用 "Invoke方法能夠免去不少麻煩就可實現並行地運行多個任務。Invoke方法會阻塞其餘線程直到全部的任務都被完成,這是一個很是常見的方面使用Invoke方法的場景。
下一個功能是並行循環,使用For和ForEach方法來定義循環。由ForEach方法與For方法很是類似,咱們將仔細講解ForEach方法。並行ForEach循環能夠經過給每一個集合項應用一個action委託的方式,實現並行地處理任何IEnumerable集合。咱們能夠提供幾種選項,自定義並行行爲,並獲得一個結果來講明循環是否成功完成。
能夠給ForEach方法提供一個ParallelOptions類的實例來控制並行循環。其容許咱們使用CancellationToken取消循環,限制最大並行度(並行運行的最大操做數),還能夠提供一個自定義的TaskScheduler類來調度任務。Action能夠接受一個附加的ParallelLoopState參數.可用於從循環中跳出或者檢查當前循環的狀態。
使用ParallelLoopState有兩種方式中止並行循環。既能夠使用Break方法,也能夠使用Stop方法。Stop方法告訴循環中止處理任何工做,並設置並行循環狀態屬性, IsStopped值爲true, Break方法中止其以後的迭代,但以前的迭代還要繼續工做。在那,種狀況下,循環結果的LowestBreaklteration屬性將會包含當Break方法被調用時的最低,循環次數。
本節將描述如何使用PLINQ來並行化查詢,以及如何將並行查詢改成順序處理。
class Program { static void Main(string[] args) { var parallelQuery = from t in GetTypes().AsParallel() select EmulateProcessing(t); var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(3)); try { parallelQuery .WithDegreeOfParallelism(Environment.ProcessorCount) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) .WithCancellation(cts.Token) .ForAll(Console.WriteLine); } catch (OperationCanceledException) { Console.WriteLine("---"); Console.WriteLine("Operation has been canceled!"); } Console.WriteLine("---"); Console.WriteLine("Unordered PLINQ query execution"); var unorderedQuery = from i in ParallelEnumerable.Range(1, 30) select i; foreach (var i in unorderedQuery) { Console.WriteLine(i); } Console.WriteLine("---"); Console.WriteLine("Ordered PLINQ query execution"); var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered() select i; foreach (var i in orderedQuery) { Console.WriteLine(i); } Console.ReadKey(); } static string EmulateProcessing(string typeName) { Thread.Sleep(TimeSpan.FromMilliseconds( new Random(DateTime.Now.Millisecond).Next(250,350))); Console.WriteLine("{0} type was processed on a thread id {1}", typeName, Thread.CurrentThread.ManagedThreadId); return typeName; } static IEnumerable<string> GetTypes() { return from assembly in AppDomain.CurrentDomain.GetAssemblies() from type in assembly.GetExportedTypes() where type.Name.StartsWith("Web") orderby type.Name.Length select type.Name; } }
當程序運行時,咱們建立了一個LINQ查詢,其使用反射API來查詢加載到當前應用程,序域中的全部組件中名稱以「Web"開頭的類型。咱們使用EmulateProcessing方法模擬處理每一個項時間的延遲,並使用PrintInfo方法打印結果。咱們也使用了Stopwatch類來測量每一個查詢的執行時間。
首先咱們運行了一個一般的順序LINQ查詢。此時並無並行化,全部任何操做都運,行在當前線程。該查詢的第二版顯式地使用了ParallelEnumerable類。ParallelEnumerable包含了PLINO的邏輯實現,而且做爲IEnumerable集合功能的一組擴展方法。一般無需顯式,地使用該類,在這裏是爲了演示PLINQ的實際工做方式。第二個版本以並行的方式運行, "EmulateProcessing操做。然而,默認狀況下結果會被合併到單個線程中,因此查詢的執行時,間應該比第一個版本少幾秒。
第三個版本展現瞭如何使用AsParallel方法來將LINO查詢按聲明的方式並行化運行。這裏咱們並不關心實現細節,只是爲了說明咱們想以並行的方式運行。然而,該版本的關鍵不一樣處是咱們使用了ForAll方法來打印查詢結果。打印結果操做與任務被處理的線程是同一個線程,跳過告終果合併步驟。它容許咱們也能以並行的方式運行PrintInfo方法,甚至該版本運行速度比以前的版本更快。
最後一個例子展現瞭如何使用AsSequential方法將PLINQ查詢以順序方式運行。能夠看到該查詢運行方式與第一個示例徹底同樣。
若是在客戶端運行程序,最重要的事情之一是有一個響應的用戶界面。這意味着不管應用程序發生什麼,全部的用戶界面元素(好比按鈕和進度條)都要保持快速運行,用戶可以從應用程序獲得快速響應。達到該點並不容易!若是你嘗試在Windows系統中打開記事本編輯器並加載一個有幾個兆字節大小的文檔,應用程序窗口將凍結一段顯著的時間,由於整個文檔要先從硬盤中加載,而後程序才能開始處理用戶輸入。
這是一個很是重要的問題,在該狀況下,惟一方案是不管如何都要避免阻塞UI線程。這反過來意味着爲了防止阻塞UI線程,每一個與UI有關的API必須只被容許異步調用。這是Window 8操做系統從新升級API的關鍵緣由,其幾乎把每一個方法替換爲異步方式。可是若是應用程序使用多線程來達到此目的會影響性能嗎?固然會!然而考慮到只有一個用戶,那麼這是划算的。若是應用程序能夠使用電腦的全部能力從而變得更加高效,並且該能力只爲運行程序的惟一用戶服務,這是好事。
接下來看看第二種狀況。若是程序運行在服務器端,則是徹底不一樣的情形。可伸縮性是最高優先級,這意味着單個用戶消耗越少的資源越好。若是爲每一個用戶建立多個線程,則!可伸縮性並很差。以高效的方式來平衡應用程序資源的消耗是個很是複雜的問題。例如,在ASPNET (其是微軟提供的web應用程序平臺)中,咱們使用工做線程池來服務客戶端請求。該池的工做線程數是有限的,因此不得不最小化每一個工做線程的使用時間以便達到高伸縮性。這意味着須要把工做線程越快越好地放回到池中,從而能夠服務下一個請求。若是咱們啓動了一個須要計算的異步操做,則整個工做流程會很低效。首先從線程池中取出一個工做線程用以服務客戶端請求。而後取出另外一個工做線程並開始處理異步操做。如今有兩個工做線程都在處理請求,若是第一個線程能作些有用的事則很是好!遺憾的是,一般狀況是咱們簡單等待異步操做完成,可是咱們卻消費了兩個工做線程,而不是一個。在該場景中,異步比同步執行實際上更糟糕!咱們不須要使用全部CPU核心,由於咱們已經在服務不少客戶端,它們已經使用了CP的全部計算能力。咱們無須保持第一個線程響應,由於這沒有用戶界面。那麼爲何咱們應該在服務器端使用異步呢?
答案是隻有異步輸人/輸出操做才應該使用異步。目前,現代計算機一般有一個磁盤驅動器來存儲文件,一塊網卡來經過網絡發送與接收數據。全部這些設備都有本身的微型計算機,以很是底層的方式來管理輸入/輸出操做併發信號給操做系統結果。這又是一個很是複雜的主題。但爲了讓概念清楚,咱們能夠這樣說,有一種方式讓程序員開始一個輸人/輸出,操做,並提供給操做系統一段代碼,當操做完成後被該代碼會被調用。在啓動I/O任務與完我之間,並不須要CPU工做。這是由相應的磁盤和網絡控制器的微型計算機完成的。這種執行I/O任務的方式被稱爲I/O線程。實現時使用的是,NET線程池,而且使用了一個來自操做系統的基礎設施,叫作I/O完成端口。
在APSNET中,一旦有一個異步的I/O操做在工做線程中開始時,它會被當即返回到線程池中。當該操做繼續運行時,該線程能夠服務其餘的客戶端。最終,當操做發出信號完成時, ASPNET基礎設施從線程池中獲取一個空閒的工做線程(該線程可能與操做開始時的!線程不一樣),而後會完成該操做。
好的,咱們如今瞭解了I/O線程對服務器應用程序的重要性。遺憾的是,很難看出,哪些API在底層使用了I/O線程。除了學習源代碼外,惟一的方式是簡單知道哪一個NET , Framework類庫對I/O線程進行了優化。在本章中,咱們將學習如何使用一些這樣的API,咱們將學習如何異步操做文件,如何使用網絡I/O來建立一個HTTP服務器並調用Windows Communication Foundation服務,以及如何使用異步API來查詢數據庫。
另外一個須要考慮的重要問題是並行。因爲一些緣由,集中地並行磁盤操做可能致使很低的性能。請記住並行I/O操做常常很是低效,順序執行I/O要好一些,可是要以異步的方式執行。
本節講述瞭如何建立一個文件,而且以異步的方式讀寫數據。
internal class Program { static void Main(string[] args) { var t = ProcessAsynchronousIO(); t.GetAwaiter().GetResult(); Console.ReadKey(); } const int BUFFER_SIZE = 4096; async static Task ProcessAsynchronousIO() { using (var stream = new FileStream("test1.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE)) { Console.WriteLine("1. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = new FileStream("test2.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) { Console.WriteLine("2. Uses I/O Threads: {0}", stream.IsAsync); byte[] buffer = Encoding.UTF8.GetBytes(CreateFileContent()); var writeTask = Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null); await writeTask; } using (var stream = File.Create("test3.txt", BUFFER_SIZE, FileOptions.Asynchronous)) using (var sw = new StreamWriter(stream)) { Console.WriteLine("3. Uses I/O Threads: {0}", stream.IsAsync); await sw.WriteAsync(CreateFileContent()); } using (var sw = new StreamWriter("test4.txt", true)) { Console.WriteLine("4. Uses I/O Threads: {0}", ((FileStream)sw.BaseStream).IsAsync); await sw.WriteAsync(CreateFileContent()); } Console.WriteLine("Starting parsing files in parallel"); Task<long>[] readTasks = new Task<long>[4]; for (int i = 0; i < 4; i++) { readTasks[i] = SumFileContent(string.Format("test{0}.txt", i + 1)); } long[] sums = await Task.WhenAll(readTasks); Console.WriteLine("Sum in all files: {0}", sums.Sum()); Console.WriteLine("Deleting files..."); Task[] deleteTasks = new Task[4]; for (int i = 0; i < 4; i++) { string fileName = string.Format("test{0}.txt", i + 1); deleteTasks[i] = SimulateAsynchronousDelete(fileName); } await Task.WhenAll(deleteTasks); Console.WriteLine("Deleting complete."); } static string CreateFileContent() { var sb = new StringBuilder(); for (int i = 0; i < 100000; i++) { sb.AppendFormat("{0}", new Random(i).Next(0, 99999)); sb.AppendLine(); } return sb.ToString(); } async static Task<long> SumFileContent(string fileName) { using (var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None, BUFFER_SIZE, FileOptions.Asynchronous)) using (var sr = new StreamReader(stream)) { long sum = 0; while (sr.Peek() > -1) { string line = await sr.ReadLineAsync(); sum += long.Parse(line); } return sum; } } static Task SimulateAsynchronousDelete(string fileName) { return Task.Run(() => File.Delete(fileName)); } }
當程序運行時,咱們以不一樣的方式建立了4個文件,而且填充了隨機數據。在第一個例 子中,使用的是FileStream類以及其方法,將異步編程模型API轉換成任務。第二個例子中也同樣,可是給FileStream構造函數提供了FileStrearn.Asynchronous參數。
使用FileOptions.Asynchronous選項是很是重要的。若是忽略該選項,咱們依然能夠以異步的方式使用該文件,但這只是在線程池中的異步委託調用。只有提供了該選項(或者在另外一個構造函數重載中使用bool useAsync),才能對FileStream類使用異步1O,
第三個例子使用了一些簡化的API,好比File.Create方法和StreamWrite類。它也使用 1/0線程,咱們能夠使用Stream.IsAsync屬性來檢查。最後一個例子說明了過度簡化也很差。這裏咱們藉助於異步委託調用來模擬異步1O,其實並無使用異步1O。
接着並行地異步地從全部文件中讀取數據,統計每一個文件內容,而後求總和。最後,刪除全部文件。因爲在任何非Windows商店應用程序中並無異步刪除文件的API,咱們使用 Task.Run工廠方法來模擬異步刪除文件。
本節展現瞭如何編寫一個簡單的異步HTTP服務器。
class Program { static void Main(string[] args) { var server = new AsyncHttpServer(portNumber: 1234); var t = Task.Run(() => server.Start()); Console.WriteLine("Listening on port 1234. Open http://localhost:1234 in your browser."); Console.WriteLine("Trying to connect:"); Console.WriteLine(); GetResponseAsync("http://localhost:1234").GetAwaiter().GetResult(); Console.WriteLine(); Console.WriteLine("Press Enter to stop the server."); Console.ReadLine(); server.Stop().GetAwaiter().GetResult(); Console.ReadKey(); } static async Task GetResponseAsync(string url) { using (var client = new HttpClient()) { HttpResponseMessage responseMessage = await client.GetAsync(url); string responseHeaders = responseMessage.Headers.ToString(); string response = await responseMessage.Content.ReadAsStringAsync(); Console.WriteLine("Response headers:"); Console.WriteLine(responseHeaders); Console.WriteLine("Response body:"); Console.WriteLine(response); } } class AsyncHttpServer { readonly HttpListener _listener; const string RESPONSE_TEMPLATE = "<html><head><title>Test</title></head><body><h2>Test page</h2><h4>Today is: {0}</h4></body></html>"; public AsyncHttpServer(int portNumber) { _listener = new HttpListener(); _listener.Prefixes.Add(string.Format("http://+:{0}/", portNumber)); } public async Task Start() { _listener.Start(); while (true) { var ctx = await _listener.GetContextAsync(); Console.WriteLine("Client connected..."); var response = string.Format(RESPONSE_TEMPLATE, DateTime.Now); using (var sw = new StreamWriter(ctx.Response.OutputStream)) { await sw.WriteAsync(response); await sw.FlushAsync(); } } } public async Task Stop() { _listener.Abort(); } } }
這裏咱們經過HttpListener類實現了一個很是簡單的web服務器。也使用了TcpListener類進行TCP套接字10操做。咱們配置該監聽器接收任何主機到本地機器1234端口的鏈接。而後在單獨的工做線程中啓動該監聽器,從而在主線程中能夠控制該監聽器。
當使用GetContextAsync方法時會發生異步I/O操做。遺憾的是,其並不接收, CancellationToken從而實現取消功能。因此若是想關閉該服務器,只需調用listener.Abort.方法,這將丟棄全部鏈接並關閉該服務器。
爲了對該服務器執行一個異步請求,咱們使用了統一命名空間下的System.Net.Http集合中的HttpClient類。咱們使用Get.Async方法來發起一個異步的HTTP GET請求。還有其餘的方法用於發起其餘HTTP請求,好比POST, DELETE以及PUT, HttpClient還有不少其餘,的選項,好比使用不一樣的格式(好比XML和JSON)來序列化和反序列化對象,指定代理服,務器地址,認證以及其餘配置。
當運行該程序時,能夠看到該服務器被啓動起來。在服務器端代碼中,咱們使用, GetContextAsync方法來接收新的客戶端鏈接。當有新的客戶端鏈接時該方法就會返回,我,們簡單的輸出一個包含當前日期和時間的很是基礎的HTML做爲響應。而後咱們請求服務器,並打印出響應頭和內容。你也能夠打開瀏覽器訪問http://localhost:1234/地址。你將看到相同的響應結果顯示在瀏覽器窗口。
本節演示了建立數據庫,以及異步地操做數據、讀取數據的過程。
class Program { static void Main(string[] args) { const string dataBaseName = "CustomDatabase"; var t = ProcessAsynchronousIO(dataBaseName); t.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } async static Task ProcessAsynchronousIO(string dbName) { try { const string connectionString = @"Data Source=(LocalDB)\v11.0;Initial Catalog=master;Integrated Security=True"; string outputFolder = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); string dbFileName = Path.Combine(outputFolder, string.Format(@".\{0}.mdf", dbName)); string dbLogFileName = Path.Combine(outputFolder, string.Format(@".\{0}_log.ldf", dbName)); string dbConnectionString = string.Format(@"Data Source=(LocalDB)\v11.0;AttachDBFileName={1};Initial Catalog={0};Integrated Security=True;", dbName, dbFileName); using (var connection = new SqlConnection(connectionString)) { await connection.OpenAsync(); if (File.Exists(dbFileName)) { Console.WriteLine("Detaching the database..."); var detachCommand = new SqlCommand("sp_detach_db", connection); detachCommand.CommandType = CommandType.StoredProcedure; detachCommand.Parameters.AddWithValue("@dbname", dbName); await detachCommand.ExecuteNonQueryAsync(); Console.WriteLine("The database was detached succesfully."); Console.WriteLine("Deleteing the database..."); if(File.Exists(dbLogFileName)) File.Delete(dbLogFileName); File.Delete(dbFileName); Console.WriteLine("The database was deleted succesfully."); } Console.WriteLine("Creating the database..."); string createCommand = String.Format("CREATE DATABASE {0} ON (NAME = N'{0}', FILENAME = '{1}')", dbName, dbFileName); var cmd = new SqlCommand(createCommand, connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("The database was created succesfully"); } using (var connection = new SqlConnection(dbConnectionString)) { await connection.OpenAsync(); var cmd = new SqlCommand("SELECT newid()", connection); var result = await cmd.ExecuteScalarAsync(); Console.WriteLine("New GUID from DataBase: {0}", result); cmd = new SqlCommand(@"CREATE TABLE [dbo].[CustomTable]( [ID] [int] IDENTITY(1,1) NOT NULL, [Name] [nvarchar](50) NOT NULL, CONSTRAINT [PK_ID] PRIMARY KEY CLUSTERED ([ID] ASC) ON [PRIMARY]) ON [PRIMARY]", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Table was created succesfully."); cmd = new SqlCommand(@"INSERT INTO [dbo].[CustomTable] (Name) VALUES ('John'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Peter'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('James'); INSERT INTO [dbo].[CustomTable] (Name) VALUES ('Eugene');", connection); await cmd.ExecuteNonQueryAsync(); Console.WriteLine("Inserted data succesfully"); Console.WriteLine("Reading data from table..."); cmd = new SqlCommand(@"SELECT * FROM [dbo].[CustomTable]", connection); using (SqlDataReader reader = await cmd.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { var id = reader.GetFieldValue<int>(0); var name = reader.GetFieldValue<string>(1); Console.WriteLine("Table row: Id {0}, Name {1}", id, name); } } } } catch(Exception ex) { Console.WriteLine("Error: {0}", ex.Message); } } }
該程序使用了一個軟件,叫作SOL Server 2012 LocalDb,安裝Visual Studio 2012時會附帶安裝它,應該能正常使用。可是若是有什麼錯誤,你能夠經過安裝嚮導來修復該組件。
先要配置數據庫文件的存放路徑。咱們將數據庫文件放置在應用程序執行目錄中。有兩個文件,一個是數據庫自己,另外一個是事務日誌文件。咱們也配置了兩個鏈接字符串來定義如何鏈接數據庫。第一個字符串是鏈接到LocalDb引擎來分離數據庫。若是數據庫已經存在、則刪除並重建。當打開鏈接以及單獨使用OpenAsync和ExecuteNonQueryAsync方法執,行SQL命令時、咱們使用了10異步操做。
在該任務完成後,咱們附加了一個最新建立的數據庫。咱們建立了一張新的表並插入了一些數據。除了以前提到的方法,咱們還使用了ExecuteScalarAsync來異步地從數據庫引擎中獲得一個標量值,而且使用SqIDataReaderReadAsync方法來從數據庫表中異步地讀取數據行。
若是在數據庫有一個大數據量的表,裏面數據行中包含大數據量的二進制值,能夠使用CommandBehavior.SequentialAcess枚舉來建立數據閱讀器異步地經過數據閱讀器獲取大字段值。,並使用GetFieldValueAsync方法
本節描述瞭如何建立一個WCF服務,並宿主在命令行應用程序中。客戶端能夠訪問服務元數據,並以異步的方式消費它
請執行如下步驟來了解如何使用WCF服務:
using System; using System.ServiceModel; using System.ServiceModel.Description; using System.Threading.Tasks;
const string SERVICE_URL = "http://localhost:1234/HelloWorld"; static async Task RunServiceClient() { var endpoint = new EndpointAddress(SERVICE_URL); var channel = ChannelFactory<IHelloWorldServiceClient>.CreateChannel(new BasicHttpBinding(), endpoint); var greeting = await channel.GreetAsync("Eugene"); Console.WriteLine(greeting); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldService { [OperationContract] string Greet(string name); } [ServiceContract(Namespace = "Packt", Name = "HelloWorldServiceContract")] public interface IHelloWorldServiceClient { [OperationContract] string Greet(string name); [OperationContract] Task<string> GreetAsync(string name); } public class HelloWorldService : IHelloWorldService { public string Greet(string name) { return string.Format("Greetings, {0}!", name); } }
ServiceHost host = null; try { host = new ServiceHost(typeof (HelloWorldService), new Uri(SERVICE_URL)); var metadata = host.Description.Behaviors.Find<ServiceMetadataBehavior>(); if (null == metadata) { metadata = new ServiceMetadataBehavior(); } metadata.HttpGetEnabled = true; metadata.MetadataExporter.PolicyVersion = PolicyVersion.Policy15; host.Description.Behaviors.Add(metadata); host.AddServiceEndpoint(ServiceMetadataBehavior.MexContractName, MetadataExchangeBindings.CreateMexHttpBinding(),"mex"); var endpoint = host.AddServiceEndpoint(typeof (IHelloWorldService), new BasicHttpBinding(), SERVICE_URL); host.Faulted += (sender, e) => Console.WriteLine("Error!"); host.Open(); Console.WriteLine("Greeting service is running and listening on:"); Console.WriteLine("{0} ({1})", endpoint.Address, endpoint.Binding.Name); var client = RunServiceClient(); client.GetAwaiter().GetResult(); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine("Error in catch block: {0}", ex); } finally { if (null != host) { if (host.State == CommunicationState.Faulted) { host.Abort(); } else { host.Close(); } } }
Windows Communication Foundation (簡稱WCF)是一個框架,用於以不一樣的方式調用,遠程服務。其中一個有一段時間很是流行,用於經過HTTP使用基於XML的協議來調用遠,程服務,它叫作簡單對象訪問協議(Simple Object Access Protocol,簡稱SOAP)。
Visual Studio 2012對WCF服務有着很是豐富的支持。例如,你能夠使用添加服務引用,菜單項給這樣的服務添加引用。你也可對本節中的服務使用此功能,由於咱們提供了服務元數據。
爲了建立這樣的服務,咱們須要使用ServiceHost類來宿主咱們的服務。咱們經過提供,一個服務實現類型和服務地址URL來描述如何宿主服務。而後配置了元數據終端和服務終,端。最後,使用Faulted事件來處理錯誤,並運行該宿主服務。
爲了消費該服務,咱們建立了一個客戶端,這是主要的技巧所在。在服務器端,咱們有,.一個服務,是一個普通的同步方法,叫作Greet,服務契約1HelloWorldService定義了該方,法。然而,若是想使用異步網絡1O,咱們須要異步地調用該方法。能夠經過使用匹配的命名空間和服務名來建立一個新的服務契約,而後同時定義同步方法和基於任務的異步方法。儘管事實上在服務器端咱們沒有異步方法,可是若是咱們遵循命名約定, WCF基礎設施明白,咱們想建立一個異步的代理方法。
所以,當咱們建立一個1HelloworldServiceClient代理渠道, WCF會正確地路由一個異步調用到該服務器端同步方法。若是你運行程序,而後打開瀏覽器並使用該服務的URL http://localhost: 1234/Helloworld來訪問該服務。你會看到該服務的描述,還能夠瀏覽XML元數據,該元數據可用於從Visual Studio 2012添加服務引用。若是你嘗試生成引用,將看到稍,微有點複雜的代碼,但它是自動建立的,而且易於使用。