引言html
本文主要從線程的基礎用法,CLR線程池當中工做者線程與I/O線程的開發,並行操做PLINQ等多個方面介紹多線程的開發。
其中委託的BeginInvoke方法以及回調函數最爲經常使用。
而 I/O線程可能容易遭到你們的忽略,其實在開發多線程系統,更應該多留意I/O線程的操做。特別是在ASP.NET開發當中,可能更多人只會留意在客戶端使用Ajax或者在服務器端使用UpdatePanel。其實合理使用I/O線程在通信項目或文件下載時,能儘量地減小IIS的壓力。
並行編程是Framework4.0中極力推廣的異步操做方式,更值得更深刻地學習。
但願本篇文章能對各位的學習研究有所幫助,當中有所錯漏的地方敬請點評。web
目錄數據庫
1、線程的定義編程
7、並行編程與PLINQ多線程
1、線程的定義
1. 1 進程、應用程序域與線程的關係
進程(Process)是Windows系統中的一個基本概念,它包含着一個運行程序所須要的資源。進程之間是相對獨立的,一個進程沒法訪問另外一個進程的數據(除非利用分佈式計算方式),一個進程運行的失敗也不會影響其餘進程的運行,Windows系統就是利用進程把工做劃分爲多個獨立的區域的。進程能夠理解爲一個程序的基本邊界。
應用程序域(AppDomain)是一個程序運行的邏輯區域,它能夠視爲一個輕量級的進程,.NET的程序集正是在應用程序域中運行的,一個進程能夠包含有多個應用程序域,一個應用程序域也能夠包含多個程序集。在一個應用程序域中包含了一個或多個上下文context,使用上下文CLR就可以把某些特殊對象的狀態放置在不一樣容器當中。
線程(Thread)是進程中的基本執行單元,在進程入口執行的第一個線程被視爲這個進程的主線程。在.NET應用程序中,都是以Main()方法做爲入口的,當調用此方法時系統就會自動建立一個主線程。線程主要是由CPU寄存器、調用棧和線程本地存儲器(Thread Local Storage,TLS)組成的。CPU寄存器主要記錄當前所執行線程的狀態,調用棧主要用於維護線程所調用到的內存與數據,TLS主要用於存放線程的狀態信息。
進程、應用程序域、線程的關係以下圖,一個進程內能夠包括多個應用程序域,也有包括多個線程,線程也能夠穿梭於多個應用程序域當中。但在同一個時刻,線程只會處於一個應用程序域內。
因爲本文是以介紹多線程技術爲主題,對進程、應用程序域的介紹就到此爲止。關於進程、線程、應用程序域的技術,在「C#綜合揭祕——細說進程、應用程序域與上下文」會有詳細介紹。
1.2 多線程
在單CPU系統的一個單位時間(time slice)內,CPU只能運行單個線程,運行順序取決於線程的優先級別。若是在單位時間內線程未能完成執行,系統就會把線程的狀態信息保存到線程的本地存儲器(TLS) 中,以便下次執行時恢復執行。而多線程只是系統帶來的一個假像,它在多個單位時間內進行多個線程的切換。由於切換頻密並且單位時間很是短暫,因此多線程可被視做同時運行。
適當使用多線程能提升系統的性能,好比:在系統請求大容量的數據時使用多線程,把數據輸出工做交給異步線程,使主線程保持其穩定性去處理其餘問題。但須要注意一點,由於CPU須要花費很多的時間在線程的切換上,因此過多地使用多線程反而會致使性能的降低。
2、線程的基礎知識
2.1 System.Threading.Thread類
System.Threading.Thread是用於控制線程的基礎類,經過Thread能夠控制當前應用程序域中線程的建立、掛起、中止、銷燬。
它包括如下經常使用公共屬性:
屬性名稱 | 說明 |
---|---|
CurrentContext | 獲取線程正在其中執行的當前上下文。 |
CurrentThread | 獲取當前正在運行的線程。 |
ExecutionContext | 獲取一個 ExecutionContext 對象,該對象包含有關當前線程的各類上下文的信息。 |
IsAlive | 獲取一個值,該值指示當前線程的執行狀態。 |
IsBackground | 獲取或設置一個值,該值指示某個線程是否爲後臺線程。 |
IsThreadPoolThread | 獲取一個值,該值指示線程是否屬於託管線程池。 |
ManagedThreadId | 獲取當前託管線程的惟一標識符。 |
Name | 獲取或設置線程的名稱。 |
Priority | 獲取或設置一個值,該值指示線程的調度優先級。 |
ThreadState | 獲取一個值,該值包含當前線程的狀態。 |
2.1.1 線程的標識符
ManagedThreadId是確認線程的惟一標識符,程序在大部分狀況下都是經過Thread.ManagedThreadId來辨別線程的。而Name是一個可變值,在默認時候,Name爲一個空值 Null,開發人員能夠經過程序設置線程的名稱,但這只是一個輔助功能。
2.1.2 線程的優先級別
.NET爲線程設置了Priority屬性來定義線程執行的優先級別,裏面包含5個選項,其中Normal是默認值。除非系統有特殊要求,不然不該該隨便設置線程的優先級別。
成員名稱 | 說明 |
---|---|
Lowest | 能夠將 Thread 安排在具備任何其餘優先級的線程以後。 |
BelowNormal | 能夠將 Thread 安排在具備 Normal 優先級的線程以後,在具備 Lowest 優先級的線程以前。 |
Normal | 默認選擇。能夠將 Thread 安排在具備 AboveNormal 優先級的線程以後,在具備BelowNormal 優先級的線程以前。 |
AboveNormal | 能夠將 Thread 安排在具備 Highest 優先級的線程以後,在具備 Normal 優先級的線程以前。 |
Highest | 能夠將 Thread 安排在具備任何其餘優先級的線程以前。 |
2.1.3 線程的狀態
經過ThreadState能夠檢測線程是處於Unstarted、Sleeping、Running 等等狀態,它比 IsAlive 屬性能提供更多的特定信息。
前面說過,一個應用程序域中可能包括多個上下文,而經過CurrentContext能夠獲取線程當前的上下文。
CurrentThread是最經常使用的一個屬性,它是用於獲取當前運行的線程。
2.1.4 System.Threading.Thread的方法
Thread 中包括了多個方法來控制線程的建立、掛起、中止、銷燬,之後來的例子中會常用。
方法名稱 | 說明 |
---|---|
Abort() | 終止本線程。 |
GetDomain() | 返回當前線程正在其中運行的當前域。 |
GetDomainId() | 返回當前線程正在其中運行的當前域Id。 |
Interrupt() | 中斷處於 WaitSleepJoin 線程狀態的線程。 |
Join() | 已重載。 阻塞調用線程,直到某個線程終止時爲止。 |
Resume() | 繼續運行已掛起的線程。 |
Start() | 執行本線程。 |
Suspend() | 掛起當前線程,若是當前線程已屬於掛起狀態則此不起做用 |
Sleep() | 把正在運行的線程掛起一段時間。 |
2.1.5 開發實例
如下這個例子,就是經過Thread顯示當前線程信息
運行結果
2.2 System.Threading 命名空間
在System.Threading命名空間內提供多個方法來構建多線程應用程序,其中ThreadPool與Thread是多線程開發中最經常使用到的,在.NET中專門設定了一個CLR線程池專門用於管理線程的運行,這個CLR線程池正是經過ThreadPool類來管理。而Thread是管理線程的最直接方式,下面幾節將詳細介紹有關內容。
類 | 說明 |
AutoResetEvent | 通知正在等待的線程已發生事件。沒法繼承此類。 |
ExecutionContext | 管理當前線程的執行上下文。沒法繼承此類。 |
Interlocked | 爲多個線程共享的變量提供原子操做。 |
Monitor | 提供同步對對象的訪問的機制。 |
Mutex | 一個同步基元,也可用於進程間同步。 |
Thread | 建立並控制線程,設置其優先級並獲取其狀態。 |
ThreadAbortException | 在對 Abort 方法進行調用時引起的異常。沒法繼承此類。 |
ThreadPool | 提供一個線程池,該線程池可用於發送工做項、處理異步 I/O、表明其餘線程等待以及處理計時器。 |
Timeout | 包含用於指定無限長的時間的常數。沒法繼承此類。 |
Timer | 提供以指定的時間間隔執行方法的機制。沒法繼承此類。 |
WaitHandle | 封裝等待對共享資源的獨佔訪問的操做系統特定的對象。 |
在System.Threading中的包含了下表中的多個經常使用委託,其中ThreadStart、ParameterizedThreadStart是最經常使用到的委託。
由ThreadStart生成的線程是最直接的方式,但由ThreadStart所生成並不受線程池管理。
而ParameterizedThreadStart是爲異步觸發帶參數的方法而設的,在下一節將爲你們逐一細說。
委託 | 說明 |
---|---|
ContextCallback | 表示要在新上下文中調用的方法。 |
ParameterizedThreadStart | 表示在 Thread 上執行的方法。 |
ThreadExceptionEventHandler | 表示將要處理 Application 的 ThreadException 事件的方法。 |
ThreadStart | 表示在 Thread 上執行的方法。 |
TimerCallback | 表示處理來自 Timer 的調用的方法。 |
WaitCallback | 表示線程池線程要執行的回調方法。 |
WaitOrTimerCallback | 表示當 WaitHandle 超時或終止時要調用的方法。 |
2.3 線程的管理方式
經過ThreadStart來建立一個新線程是最直接的方法,但這樣建立出來的線程比較難管理,若是建立過多的線程反而會讓系統的性能下載。有見及此,.NET爲線程管理專門設置了一個CLR線程池,使用CLR線程池系統能夠更合理地管理線程的使用。全部請求的服務都能運行於線程池中,當運行結束時線程便會迴歸到線程池。經過設置,能控制線程池的最大線程數量,在請求超出線程最大值時,線程池能按照操做的優先級別來執行,讓部分操做處於等待狀態,待有線程迴歸時再執行操做。
基礎知識就爲你們介紹到這裏,下面將詳細介紹多線程的開發。
3、以ThreadStart方式實現多線程
3.1 使用ThreadStart委託
這裏先以一個例子體現一下多線程帶來的好處,首先在Message類中創建一個方法ShowMessage(),裏面顯示了當前運行線程的Id,並使用Thread.Sleep(int ) 方法模擬部分工做。在main()中經過ThreadStart委託綁定Message對象的ShowMessage()方法,而後經過Thread.Start()執行異步方法。
1 public class Message
2 {
3 public void ShowMessage()
4 {
5 string message = string.Format("Async threadId is :{0}",
6 Thread.CurrentThread.ManagedThreadId);
7 Console.WriteLine(message);
8
9 for (int n = 0; n < 10; n++)
10 {
11 Thread.Sleep(300);
12 Console.WriteLine("The number is:" + n.ToString());
13 }
14 }
15 }
16
17 class Program
18 {
19 static void Main(string[] args)
20 {
21 Console.WriteLine("Main threadId is:"+
22 Thread.CurrentThread.ManagedThreadId);
23 Message message=new Message();
24 Thread thread = new Thread(new ThreadStart(message.ShowMessage));
25 thread.Start();
26 Console.WriteLine("Do something ..........!");
27 Console.WriteLine("Main thread working is complete!");
28
29 }
30 }
請注意運行結果,在調用Thread.Start()方法後,系統以異步方式運行Message.ShowMessage(),而主線程的操做是繼續執行的,在Message.ShowMessage()完成前,主線程已完成全部的操做。
3.2 使用ParameterizedThreadStart委託
ParameterizedThreadStart委託與ThreadStart委託很是類似,但ParameterizedThreadStart委託是面向帶參數方法的。注意ParameterizedThreadStart 對應方法的參數爲object,此參數能夠爲一個值對象,也能夠爲一個自定義對象。
1 public class Person
2 {
3 public string Name
4 {
5 get;
6 set;
7 }
8 public int Age
9 {
10 get;
11 set;
12 }
13 }
14
15 public class Message
16 {
17 public void ShowMessage(object person)
18 {
19 if (person != null)
20 {
21 Person _person = (Person)person;
22 string message = string.Format("\n{0}'s age is {1}!\nAsync threadId is:{2}",
23 _person.Name,_person.Age,Thread.CurrentThread.ManagedThreadId);
24 Console.WriteLine(message);
25 }
26 for (int n = 0; n < 10; n++)
27 {
28 Thread.Sleep(300);
29 Console.WriteLine("The number is:" + n.ToString());
30 }
31 }
32 }
33
34 class Program
35 {
36 static void Main(string[] args)
37 {
38 Console.WriteLine("Main threadId is:"+Thread.CurrentThread.ManagedThreadId);
39
40 Message message=new Message();
41 //綁定帶參數的異步方法
42 Thread thread = new Thread(new ParameterizedThreadStart(message.ShowMessage));
43 Person person = new Person();
44 person.Name = "Jack";
45 person.Age = 21;
46 thread.Start(person); //啓動異步線程
47
48 Console.WriteLine("Do something ..........!");
49 Console.WriteLine("Main thread working is complete!");
50
51 }
52 }
運行結果:
3.3 前臺線程與後臺線程
注意以上兩個例子都沒有使用Console.ReadKey(),但系統依然會等待異步線程完成後纔會結束。這是由於使用Thread.Start()啓動的線程默認爲前臺線程,而系統必須等待全部前臺線程運行結束後,應用程序域纔會自動卸載。
在第二節曾經介紹過線程Thread有一個屬性IsBackground,經過把此屬性設置爲true,就能夠把線程設置爲後臺線程!這時應用程序域將在主線程完成時就被卸載,而不會等待異步線程的運行。
3.4 掛起線程
爲了等待其餘後臺線程完成後再結束主線程,就可使用Thread.Sleep()方法。
1 public class Message
2 {
3 public void ShowMessage()
4 {
5 string message = string.Format("\nAsync threadId is:{0}",
6 Thread.CurrentThread.ManagedThreadId);
7 Console.WriteLine(message);
8 for (int n = 0; n < 10; n++)
9 {
10 Thread.Sleep(300);
11 Console.WriteLine("The number is:" + n.ToString());
12 }
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 Console.WriteLine("Main threadId is:"+
21 Thread.CurrentThread.ManagedThreadId);
22
23 Message message=new Message();
24 Thread thread = new Thread(new ThreadStart(message.ShowMessage));
25 thread.IsBackground = true;
26 thread.Start();
27
28 Console.WriteLine("Do something ..........!");
29 Console.WriteLine("Main thread working is complete!");
30 Console.WriteLine("Main thread sleep!");
31 Thread.Sleep(5000);
32 }
33 }
運行結果以下,此時應用程序域將在主線程運行5秒後自動結束
但系統沒法預知異步線程須要運行的時間,因此用經過Thread.Sleep(int)阻塞主線程並非一個好的解決方法。有見及此,.NET專門爲等待異步線程完成開發了另外一個方法thread.Join()。把上面例子中的最後一行Thread.Sleep(5000)修改成 thread.Join() 就能保證主線程在異步線程thread運行結束後纔會終止。
3.5 Suspend 與 Resume (慎用)
Thread.Suspend()與 Thread.Resume()是在Framework1.0 就已經存在的老方法了,它們分別能夠掛起、恢復線程。但在Framework2.0中就已經明確排斥這兩個方法。這是由於一旦某個線程佔用了已有的資源,再使用Suspend()使線程長期處於掛起狀態,當在其餘線程調用這些資源的時候就會引發死鎖!因此在沒有必要的狀況下應該避免使用這兩個方法。
3.6 終止線程
若想終止正在運行的線程,可使用Abort()方法。在使用Abort()的時候,將引起一個特殊異常 ThreadAbortException 。
若想在線程終止前恢復線程的執行,能夠在捕獲異常後 ,在catch(ThreadAbortException ex){...} 中調用Thread.ResetAbort()取消終止。
而使用Thread.Join()能夠保證應用程序域等待異步線程結束後才終止運行。
1 static void Main(string[] args)
2 {
3 Console.WriteLine("Main threadId is:" +
4 Thread.CurrentThread.ManagedThreadId);
5
6 Thread thread = new Thread(new ThreadStart(AsyncThread));
7 thread.IsBackground = true;
8 thread.Start();
9 thread.Join();
10
11 }
12
13 //以異步方式調用
14 static void AsyncThread()
15 {
16 try
17 {
18 string message = string.Format("\nAsync threadId is:{0}",
19 Thread.CurrentThread.ManagedThreadId);
20 Console.WriteLine(message);
21
22 for (int n = 0; n < 10; n++)
23 {
24 //當n等於4時,終止線程
25 if (n >= 4)
26 {
27 Thread.CurrentThread.Abort(n);
28 }
29 Thread.Sleep(300);
30 Console.WriteLine("The number is:" + n.ToString());
31 }
32 }
33 catch (ThreadAbortException ex)
34 {
35 //輸出終止線程時n的值
36 if (ex.ExceptionState != null)
37 Console.WriteLine(string.Format("Thread abort when the number is: {0}!",
38 ex.ExceptionState.ToString()));
39
40 //取消終止,繼續執行線程
41 Thread.ResetAbort();
42 Console.WriteLine("Thread ResetAbort!");
43 }
44
45 //線程結束
46 Console.WriteLine("Thread Close!");
47 }
運行結果以下
4、CLR線程池的工做者線程
4.1 關於CLR線程池
使用ThreadStart與ParameterizedThreadStart創建新線程很是簡單,但經過此方法創建的線程難於管理,若創建過多的線程反而會影響系統的性能。
有見及此,.NET引入CLR線程池這個概念。CLR線程池並不會在CLR初始化的時候馬上創建線程,而是在應用程序要建立線程來執行任務時,線程池才初始化一個線程。線程的初始化與其餘的線程同樣。在完成任務之後,該線程不會自行銷燬,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池裏掛起的線程就會再度激活執行任務。這樣既節省了創建線程所形成的性能損耗,也可讓多個任務反覆重用同一線程,從而在應用程序生存期內節約大量開銷。
注意:經過CLR線程池所創建的線程老是默認爲後臺線程,優先級數爲ThreadPriority.Normal。
4.2 工做者線程與I/O線程
CLR線程池分爲工做者線程(workerThreads)與I/O線程 (completionPortThreads) 兩種,工做者線程是主要用做管理CLR內部對象的運做,I/O(Input/Output) 線程顧名思義是用於與外部系統交換信息,IO線程的細節將在下一節詳細說明。
經過ThreadPool.GetMax(out int workerThreads,out int completionPortThreads )和 ThreadPool.SetMax( int workerThreads, int completionPortThreads)兩個方法能夠分別讀取和設置CLR線程池中工做者線程與I/O線程的最大線程數。在Framework2.0中最大線程默認爲25*CPU數,在Framewok3.0、4.0中最大線程數默認爲250*CPU數,在近年 I3,I5,I7 CPU出現後,線程池的最大值通常默認爲1000、2000。
若想測試線程池中有多少的線程正在投入使用,能夠經過ThreadPool.GetAvailableThreads( out int workerThreads,out int completionPortThreads ) 方法。
使用CLR線程池的工做者線程通常有兩種方式,一是直接經過 ThreadPool.QueueUserWorkItem() 方法,二是經過委託,下面將逐一細說。
4.3 經過QueueUserWorkItem啓動工做者線程
ThreadPool線程池中包含有兩個靜態方法能夠直接啓動工做者線程:
一爲 ThreadPool.QueueUserWorkItem(WaitCallback)
二爲 ThreadPool.QueueUserWorkItem(WaitCallback,Object)
先把WaitCallback委託指向一個帶有Object參數的無返回值方法,再使用 ThreadPool.QueueUserWorkItem(WaitCallback) 就能夠異步啓動此方法,此時異步方法的參數被視爲null 。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把CLR線程池的最大值設置爲1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //顯示主線程啓動時線程池信息
8 ThreadMessage("Start");
9 //啓動工做者線程
10 ThreadPool.QueueUserWorkItem(new WaitCallback(AsyncCallback));
11 Console.ReadKey();
12 }
13
14 static void AsyncCallback(object state)
15 {
16 Thread.Sleep(200);
17 ThreadMessage("AsyncCallback");
18 Console.WriteLine("Async thread do work!");
19 }
20
21 //顯示線程現狀
22 static void ThreadMessage(string data)
23 {
24 string message = string.Format("{0}\n CurrentThreadId is {1}",
25 data, Thread.CurrentThread.ManagedThreadId);
26 Console.WriteLine(message);
27 }
28 }
運行結果
使用 ThreadPool.QueueUserWorkItem(WaitCallback,Object) 方法能夠把object對象做爲參數傳送到回調函數中。
下面例子中就是把一個string對象做爲參數發送到回調函數當中。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把線程池的最大值設置爲1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 ThreadMessage("Start");
9 ThreadPool.QueueUserWorkItem(new WaitCallback(AsyncCallback),"Hello Elva");
10 Console.ReadKey();
11 }
12
13 static void AsyncCallback(object state)
14 {
15 Thread.Sleep(200);
16 ThreadMessage("AsyncCallback");
17
18 string data = (string)state;
19 Console.WriteLine("Async thread do work!\n"+data);
20 }
21
22 //顯示線程現狀
23 static void ThreadMessage(string data)
24 {
25 string message = string.Format("{0}\n CurrentThreadId is {1}",
26 data, Thread.CurrentThread.ManagedThreadId);
27 Console.WriteLine(message);
28 }
29 }
運行結果
經過ThreadPool.QueueUserWorkItem啓動工做者線程雖然是方便,但WaitCallback委託指向的必須是一個帶有Object參數的無返回值方法,這無疑是一種限制。若方法須要有返回值,或者帶有多個參數,這將多費周折。有見及此,.NET提供了另外一種方式去創建工做者線程,那就是委託。
4.4 委託類
使用CLR線程池中的工做者線程,最靈活最經常使用的方式就是使用委託的異步方法,在此先簡單介紹一下委託類。
當定義委託後,.NET就會自動建立一個表明該委託的類,下面能夠用反射方式顯示委託類的方法成員(對反射有興趣的朋友能夠先參考一下「.NET基礎篇——反射的奧妙」)
1 class Program
2 {
3 delegate void MyDelegate();
4
5 static void Main(string[] args)
6 {
7 MyDelegate delegate1 = new MyDelegate(AsyncThread);
8 //顯示委託類的幾個方法成員
9 var methods=delegate1.GetType().GetMethods();
10 if (methods != null)
11 foreach (MethodInfo info in methods)
12 Console.WriteLine(info.Name);
13 Console.ReadKey();
14 }
15 }
委託類包括如下幾個重要方法
1 public class MyDelegate:MulticastDelegate
2 {
3 public MyDelegate(object target, int methodPtr);
4 //調用委託方法
5 public virtual void Invoke();
6 //異步委託
7 public virtual IAsyncResult BeginInvoke(AsyncCallback callback,object state);
8 public virtual void EndInvoke(IAsyncResult result);
9 }
當調用Invoke()方法時,對應此委託的全部方法都會被執行。而BeginInvoke與EndInvoke則支持委託方法的異步調用,由BeginInvoke啓動的線程都屬於CLR線程池中的工做者線程,在下面將詳細說明。
4.5 利用BeginInvoke與EndInvoke完成異步委託方法
首先創建一個委託對象,經過IAsyncResult BeginInvoke(string name,AsyncCallback callback,object state) 異步調用委託方法,BeginInvoke 方法除最後的兩個參數外,其它參數都是與方法參數相對應的。經過 BeginInvoke 方法將返回一個實現了 System.IAsyncResult 接口的對象,以後就能夠利用EndInvoke(IAsyncResult ) 方法就能夠結束異步操做,獲取委託的運行結果。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //創建委託
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //異步調用委託,獲取計算結果
12 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
13 //完成主線程其餘工做
14 .............
15 //等待異步方法完成,調用EndInvoke(IAsyncResult)獲取運行結果
16 string data=myDelegate.EndInvoke(result);
17 Console.WriteLine(data);
18
19 Console.ReadKey();
20 }
21
22 static string Hello(string name)
23 {
24 ThreadMessage("Async Thread");
25 Thread.Sleep(2000); //虛擬異步工做
26 return "Hello " + name;
27 }
28
29 //顯示當前線程
30 static void ThreadMessage(string data)
31 {
32 string message = string.Format("{0}\n ThreadId is:{1}",
33 data,Thread.CurrentThread.ManagedThreadId);
34 Console.WriteLine(message);
35 }
36 }
運行結果
4.6 善用IAsyncResult
在以上例子中能夠看見,若是在使用myDelegate.BeginInvoke後當即調用myDelegate.EndInvoke,那在異步線程未完成工做之前主線程將處於阻塞狀態,等到異步線程結束獲取計算結果後,主線程才能繼續工做,這明顯沒法展現出多線程的優點。此時能夠好好利用IAsyncResult 提升主線程的工做性能,IAsyncResult有如下成員:
1 public interface IAsyncResult
2 {
3 object AsyncState {get;} //獲取用戶定義的對象,它限定或包含關於異步操做的信息。
4 WailHandle AsyncWaitHandle {get;} //獲取用於等待異步操做完成的 WaitHandle。
5 bool CompletedSynchronously {get;} //獲取異步操做是否同步完成的指示。
6 bool IsCompleted {get;} //獲取異步操做是否已完成的指示。
7 }
經過輪詢方式,使用IsCompleted屬性判斷異步操做是否完成,這樣在異步操做未完成前就可讓主線程執行另外的工做。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //創建委託
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //異步調用委託,獲取計算結果
12 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
13 //在異步線程未完成前執行其餘工做
14 while (!result.IsCompleted)
15 {
16 Thread.Sleep(200); //虛擬操做
17 Console.WriteLine("Main thead do work!");
18 }
19 string data=myDelegate.EndInvoke(result);
20 Console.WriteLine(data);
21
22 Console.ReadKey();
23 }
24
25 static string Hello(string name)
26 {
27 ThreadMessage("Async Thread");
28 Thread.Sleep(2000);
29 return "Hello " + name;
30 }
31
32 static void ThreadMessage(string data)
33 {
34 string message = string.Format("{0}\n ThreadId is:{1}",
35 data,Thread.CurrentThread.ManagedThreadId);
36 Console.WriteLine(message);
37 }
38 }
運行結果:
除此之外,也可使用WailHandle完成一樣的工做,WaitHandle裏面包含有一個方法WaitOne(int timeout),它能夠判斷委託是否完成工做,在工做未完成前主線程能夠繼續其餘工做。運行下面代碼可獲得與使用 IAsyncResult.IsCompleted 一樣的結果,並且更簡單方便 。
1 namespace Test
2 {
3 class Program
4 {
5 delegate string MyDelegate(string name);
6
7 static void Main(string[] args)
8 {
9 ThreadMessage("Main Thread");
10
11 //創建委託
12 MyDelegate myDelegate = new MyDelegate(Hello);
13
14 //異步調用委託,獲取計算結果
15 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
16
17 while (!result.AsyncWaitHandle.WaitOne(200))
18 {
19 Console.WriteLine("Main thead do work!");
20 }
21 string data=myDelegate.EndInvoke(result);
22 Console.WriteLine(data);
23
24 Console.ReadKey();
25 }
26
27 static string Hello(string name)
28 {
29 ThreadMessage("Async Thread");
30 Thread.Sleep(2000);
31 return "Hello " + name;
32 }
33
34 static void ThreadMessage(string data)
35 {
36 string message = string.Format("{0}\n ThreadId is:{1}",
37 data,Thread.CurrentThread.ManagedThreadId);
38 Console.WriteLine(message);
39 }
40 }
當要監視多個運行對象的時候,使用IAsyncResult.WaitHandle.WaitOne可就派不上用場了。
幸虧.NET爲WaitHandle準備了另外兩個靜態方法:WaitAny(waitHandle[], int)與WaitAll (waitHandle[] , int)。
其中WaitAll在等待全部waitHandle完成後再返回一個bool值。
而WaitAny是等待其中一個waitHandle完成後就返回一個int,這個int是表明已完成waitHandle在waitHandle[]中的數組索引。
下面就是使用WaitAll的例子,運行結果與使用 IAsyncResult.IsCompleted 相同。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //創建委託
10 MyDelegate myDelegate = new MyDelegate(Hello);
11
12 //異步調用委託,獲取計算結果
13 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
14
15 //此處可加入多個檢測對象
16 WaitHandle[] waitHandleList = new WaitHandle[] { result.AsyncWaitHandle,........ };
17 while (!WaitHandle.WaitAll(waitHandleList,200))
18 {
19 Console.WriteLine("Main thead do work!");
20 }
21 string data=myDelegate.EndInvoke(result);
22 Console.WriteLine(data);
23
24 Console.ReadKey();
25 }
26
27 static string Hello(string name)
28 {
29 ThreadMessage("Async Thread");
30 Thread.Sleep(2000);
31 return "Hello " + name;
32 }
33
34 static void ThreadMessage(string data)
35 {
36 string message = string.Format("{0}\n ThreadId is:{1}",
37 data,Thread.CurrentThread.ManagedThreadId);
38 Console.WriteLine(message);
39 }
40 }
4.7 回調函數
使用輪詢方式來檢測異步方法的狀態很是麻煩,並且效率不高,有見及此,.NET爲 IAsyncResult BeginInvoke(AsyncCallback , object)準備了一個回調函數。使用 AsyncCallback 就能夠綁定一個方法做爲回調函數,回調函數必須是帶參數 IAsyncResult 且無返回值的方法: void AsycnCallbackMethod(IAsyncResult result) 。在BeginInvoke方法完成後,系統就會調用AsyncCallback所綁定的回調函數,最後回調函數中調用 XXX EndInvoke(IAsyncResult result) 就能夠結束異步方法,它的返回值類型與委託的返回值一致。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //創建委託
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //異步調用委託,獲取計算結果
12 myDelegate.BeginInvoke("Leslie", new AsyncCallback(Completed), null);
13 //在啓動異步線程後,主線程能夠繼續工做而不須要等待
14 for (int n = 0; n < 6; n++)
15 Console.WriteLine(" Main thread do work!");
16 Console.WriteLine("");
17
18 Console.ReadKey();
19 }
20
21 static string Hello(string name)
22 {
23 ThreadMessage("Async Thread");
24 Thread.Sleep(2000); \\模擬異步操做
25 return "\nHello " + name;
26 }
27
28 static void Completed(IAsyncResult result)
29 {
30 ThreadMessage("Async Completed");
31
32 //獲取委託對象,調用EndInvoke方法獲取運行結果
33 AsyncResult _result = (AsyncResult)result;
34 MyDelegate myDelegate = (MyDelegate)_result.AsyncDelegate;
35 string data = myDelegate.EndInvoke(_result);
36 Console.WriteLine(data);
37 }
38
39 static void ThreadMessage(string data)
40 {
41 string message = string.Format("{0}\n ThreadId is:{1}",
42 data, Thread.CurrentThread.ManagedThreadId);
43 Console.WriteLine(message);
44 }
45 }
能夠看到,主線在調用BeginInvoke方法能夠繼續執行其餘命令,而無需再等待了,這無疑比使用輪詢方式判斷異步方法是否完成更有優點。
在異步方法執行完成後將會調用AsyncCallback所綁定的回調函數,注意一點,回調函數依然是在異步線程中執行,這樣就不會影響主線程的運行,這也使用回調函數最值得青昧的地方。
在回調函數中有一個既定的參數IAsyncResult,把IAsyncResult強制轉換爲AsyncResult後,就能夠經過 AsyncResult.AsyncDelegate 獲取原委託,再使用EndInvoke方法獲取計算結果。
運行結果以下:
若是想爲回調函數傳送一些外部信息,就能夠利用BeginInvoke(AsyncCallback,object)的最後一個參數object,它容許外部向回調函數輸入任何類型的參數。只須要在回調函數中利用 AsyncResult.AsyncState 就能夠獲取object對象。
1 class Program
2 {
3 public class Person
4 {
5 public string Name;
6 public int Age;
7 }
8
9 delegate string MyDelegate(string name);
10
11 static void Main(string[] args)
12 {
13 ThreadMessage("Main Thread");
14
15 //創建委託
16 MyDelegate myDelegate = new MyDelegate(Hello);
17
18 //創建Person對象
19 Person person = new Person();
20 person.Name = "Elva";
21 person.Age = 27;
22
23 //異步調用委託,輸入參數對象person, 獲取計算結果
24 myDelegate.BeginInvoke("Leslie", new AsyncCallback(Completed), person);
25
26 //在啓動異步線程後,主線程能夠繼續工做而不須要等待
27 for (int n = 0; n < 6; n++)
28 Console.WriteLine(" Main thread do work!");
29 Console.WriteLine("");
30
31 Console.ReadKey();
32 }
33
34 static string Hello(string name)
35 {
36 ThreadMessage("Async Thread");
37 Thread.Sleep(2000);
38 return "\nHello " + name;
39 }
40
41 static void Completed(IAsyncResult result)
42 {
43 ThreadMessage("Async Completed");
44
45 //獲取委託對象,調用EndInvoke方法獲取運行結果
46 AsyncResult _result = (AsyncResult)result;
47 MyDelegate myDelegate = (MyDelegate)_result.AsyncDelegate;
48 string data = myDelegate.EndInvoke(_result);
49 //獲取Person對象
50 Person person = (Person)result.AsyncState;
51 string message = person.Name + "'s age is " + person.Age.ToString();
52
53 Console.WriteLine(data+"\n"+message);
54 }
55
56 static void ThreadMessage(string data)
57 {
58 string message = string.Format("{0}\n ThreadId is:{1}",
59 data, Thread.CurrentThread.ManagedThreadId);
60 Console.WriteLine(message);
61 }
62 }
運行結果:
5、CLR線程池的I/O線程
在前一節所介紹的線程都屬於CLR線程池的工做者線程,這一節開始爲你們介紹一下CLR線程池的I/O線程
I/O 線程是.NET專爲訪問外部資源所設置的一種線程,由於訪問外部資源經常要受到外界因素的影響,爲了防止讓主線程受影響而長期處於阻塞狀態,.NET爲多個I/O操做都創建起了異步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,並且每一個異步方法的使用方式都很是相似,都是以BeginXXX爲開始,以EndXXX結束,下面爲你們一一解說。
5.1 異步讀寫 FileStream
須要在 FileStream 異步調用 I/O線程,必須使用如下構造函數創建 FileStream 對象,並把useAsync設置爲 true。
FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;
其中 path 是文件的相對路徑或絕對路徑; mode 肯定如何打開或建立文件; access 肯定訪問文件的方式; share 肯定文件如何進程共享; bufferSize 是表明緩衝區大小,通常默認最小值爲8,在啓動異步讀取或寫入時,文件大小通常大於緩衝大小; userAsync表明是否啓動異步I/O線程。
5.1.1 異步寫入
FileStream中包含BeginWrite、EndWrite 方法能夠啓動I/O線程進行異步寫入。
public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )
public override void EndWrite (IAsyncResult asyncResult )
BeginWrite 返回值爲IAsyncResult, 使用方式與委託的BeginInvoke方法類似,最好就是使用回調函數,避免線程阻塞。在最後兩個參數中,參數AsyncCallback用於綁定回調函數; 參數Object用於傳遞外部數據。要注意一點:AsyncCallback所綁定的回調函數必須是帶單個 IAsyncResult 參數的無返回值方法。
在例子中,把FileStream做爲外部數據傳遞到回調函數當中,而後在回調函數中利用IAsyncResult.AsyncState獲取FileStream對象,最後經過FileStream.EndWrite(IAsyncResult)結束寫入。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把線程池的最大值設置爲1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //新立文件File.sour
10 FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate,
11 FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);
12 byte[] bytes = new byte[16384];
13 string message = "An operating-system ThreadId has no fixed relationship........";
14 bytes = Encoding.Unicode.GetBytes(message);
15
16 //啓動異步寫入
17 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);
18 stream.Flush();
19
20 Console.ReadKey();
21 }
22
23 static void Callback(IAsyncResult result)
24 {
25 //顯示線程池現狀
26 Thread.Sleep(200);
27 ThreadPoolMessage("AsyncCallback");
28 //結束異步寫入
29 FileStream stream = (FileStream)result.AsyncState;
30 stream.EndWrite(result);
31 stream.Close();
32 }
33
34 //顯示線程池現狀
35 static void ThreadPoolMessage(string data)
36 {
37 int a, b;
38 ThreadPool.GetAvailableThreads(out a, out b);
39 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
40 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
41 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
由輸出結果能夠看到,在使用FileStream.BeginWrite方法後,系統將自動啓動CLR線程池中I/O線程。
5.1.2 異步讀取
FileStream 中包含 BeginRead 與 EndRead 能夠異步調用I/O線程進行讀取。
public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)
其使用方式與BeginWrite和EndWrite類似,AsyncCallback用於綁定回調函數; Object用於傳遞外部數據。在回調函數只須要使用IAsyncResut.AsyncState就可獲取外部數據。EndWrite 方法會返回從流讀取到的字節數量。
首先定義 FileData 類,裏面包含FileStream對象,byte[] 數組和長度。而後把FileData對象做爲外部數據傳到回調函數,在回調函數中,把IAsyncResult.AsyncState強制轉換爲FileData,而後經過FileStream.EndRead(IAsyncResult)結束讀取。最後比較一下長度,若讀取到的長度與輸入的數據長度不一至,則拋出異常。
1 class Program
2 {
3 public class FileData
4 {
5 public FileStream Stream;
6 public int Length;
7 public byte[] ByteData;
8 }
9
10 static void Main(string[] args)
11 {
12 //把線程池的最大值設置爲1000
13 ThreadPool.SetMaxThreads(1000, 1000);
14 ThreadPoolMessage("Start");
15 ReadFile();
16
17 Console.ReadKey();
18 }
19
20 static void ReadFile()
21 {
22 byte[] byteData=new byte[80961024];
23 FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate,
24 FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
25
26 //把FileStream對象,byte[]對象,長度等有關數據綁定到FileData對象中,以附帶屬性方式送到回調函數
27 FileData fileData = new FileData();
28 fileData.Stream = stream;
29 fileData.Length = (int)stream.Length;
30 fileData.ByteData = byteData;
31
32 //啓動異步讀取
33 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);
34 }
35
36 static void Completed(IAsyncResult result)
37 {
38 ThreadPoolMessage("Completed");
39
40 //把AsyncResult.AsyncState轉換爲FileData對象,以FileStream.EndRead完成異步讀取
41 FileData fileData = (FileData)result.AsyncState;
42 int length=fileData.Stream.EndRead(result);
43 fileData.Stream.Close();
44
45 //若是讀取到的長度與輸入長度不一致,則拋出異常
46 if (length != fileData.Length)
47 throw new Exception("Stream is not complete!");
48
49 string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);
50 Console.WriteLine(data.Substring(2,22));
51 }
52
53 //顯示線程池現狀
54 static void ThreadPoolMessage(string data)
55 {
56 int a, b;
57 ThreadPool.GetAvailableThreads(out a, out b);
58 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
59 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
60 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
61 Console.WriteLine(message);
62 }
63
64 }
由輸出結果能夠看到,在使用FileStream.BeginRead方法後,系統將自動啓動CLR線程池中I/O線程。
5.2 異步操做TCP/IP套接字
在介紹 TCP/IP 套接字前先簡單介紹一下 NetworkStream 類,它是用於網絡訪問的基礎數據流。 NetworkStream 提供了好幾個方法控制套接字數據的發送與接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 可以實現異步操做,並且異步線程是來自於CLR線程池的I/O線程。
public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)
public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)
public override IAsyncResult BeginRead (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)
public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)
若要建立 NetworkStream,必須提供已鏈接的 Socket。而在.NET中使用TCP/IP套接字不須要直接與Socket打交道,由於.NET把Socket的大部分操做都放在System.Net.TcpListener和System.Net.Sockets.TcpClient裏面,這兩個類大大地簡化了Socket的操做。通常套接字對象Socket包含一個Accept()方法,此方法能產生阻塞來等待客戶端的請求,而在TcpListener類裏也包含了一個類似的方法 public TcpClient AcceptTcpClient()用於等待客戶端的請求。此方法將會返回一個TcpClient 對象,經過 TcpClient 的 public NetworkStream GetStream()方法就能獲取NetworkStream對象,控制套接字數據的發送與接收。
下面以一個例子說明異步調用TCP/IP套接字收發數據的過程。
首先在服務器端創建默認地址127.0.0.1用於收發信息,使用此地址與端口500新建TcpListener對象,調用TcpListener.Start 偵聽傳入的鏈接請求,再使用一個死循環來監聽信息。
在ChatClient類包括有接收信息與發送信息兩個功能:當接收到客戶端請求時,它會利用 NetworkStream.BeginRead 讀取客戶端信息,並在回調函數ReceiveAsyncCallback中輸出信息內容,若接收到的信息的大小小於1時,它將會拋出一個異常。當信息成功接收後,再使用 NetworkStream.BeginWrite 方法回饋信息到客戶端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置CLR線程池最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //默認地址爲127.0.0.1
9 IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
10 TcpListener tcpListener = new TcpListener(ipAddress, 500);
11 tcpListener.Start();
12
13 //以一個死循環來實現監聽
14 while (true)
15 { //調用一個ChatClient對象來實現監聽
16 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient());
17 }
18 }
19 }
20
21 public class ChatClient
22 {
23 static TcpClient tcpClient;
24 static byte[] byteMessage;
25 static string clientEndPoint;
26
27 public ChatClient(TcpClient tcpClient1)
28 {
29 tcpClient = tcpClient1;
30 byteMessage = new byte[tcpClient.ReceiveBufferSize];
31
32 //顯示客戶端信息
33 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString();
34 Console.WriteLine("Client's endpoint is " + clientEndPoint);
35
36 //使用NetworkStream.BeginRead異步讀取信息
37 NetworkStream networkStream = tcpClient.GetStream();
38 networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
39 new AsyncCallback(ReceiveAsyncCallback), null);
40 }
41
42 public void ReceiveAsyncCallback(IAsyncResult iAsyncResult)
43 {
44 //顯示CLR線程池狀態
45 Thread.Sleep(100);
46 ThreadPoolMessage("\nMessage is receiving");
47
48 //使用NetworkStream.EndRead結束異步讀取
49 NetworkStream networkStreamRead = tcpClient.GetStream();
50 int length=networkStreamRead.EndRead(iAsyncResult);
51
52 //若是接收到的數據長度少於1則拋出異常
53 if (length < 1)
54 {
55 tcpClient.GetStream().Close();
56 throw new Exception("Disconnection!");
57 }
58
59 //顯示接收信息
60 string message = Encoding.UTF8.GetString(byteMessage, 0, length);
61 Console.WriteLine("Message:" + message);
62
63 //使用NetworkStream.BeginWrite異步發送信息
64 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!");
65 NetworkStream networkStreamWrite=tcpClient.GetStream();
66 networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length,
67 new AsyncCallback(SendAsyncCallback), null);
68 }
69
70 //把信息轉換成二進制數據,而後發送到客戶端
71 public void SendAsyncCallback(IAsyncResult iAsyncResult)
72 {
73 //顯示CLR線程池狀態
74 Thread.Sleep(100);
75 ThreadPoolMessage("\nMessage is sending");
76
77 //使用NetworkStream.EndWrite結束異步發送
78 tcpClient.GetStream().EndWrite(iAsyncResult);
79
80 //從新監聽
81 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
82 new AsyncCallback(ReceiveAsyncCallback), null);
83 }
84
85 //顯示線程池現狀
86 static void ThreadPoolMessage(string data)
87 {
88 int a, b;
89 ThreadPool.GetAvailableThreads(out a, out b);
90 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
91 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
92 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
93
94 Console.WriteLine(message);
95 }
96 }
而在客戶端只是使用簡單的開發方式,利用TcpClient鏈接到服務器端,而後調用NetworkStream.Write方法發送信息,最後調用NetworkStream.Read方法讀取回饋信息
1 static void Main(string[] args)
2 {
3 //鏈接服務端
4 TcpClient tcpClient = new TcpClient("127.0.0.1", 500);
5
6 //發送信息
7 NetworkStream networkStream = tcpClient.GetStream();
8 byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!");
9 networkStream.Write(sendMessage, 0, sendMessage.Length);
10 networkStream.Flush();
11
12 //接收信息
13 byte[] receiveMessage=new byte[1024];
14 int count=networkStream.Read(receiveMessage, 0,1024);
15 Console.WriteLine(Encoding.UTF8.GetString(receiveMessage));
16 Console.ReadKey();
17 }
注意觀察運行結果,服務器端的異步操做線程都是來自於CLR線程池的I/O線程
5.3 異步WebRequest
System.Net.WebRequest 是 .NET 爲實現訪問 Internet 的 「請求/響應模型」 而開發的一個 abstract 基類, 它主要有三個子類:FtpWebRequest、HttpWebRequest、FileWebRequest。當使用WebRequest.Create(string uri)建立對象時,應用程序就能夠根據請求協議判斷實現類來進行操做。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其做用:FileWebRequest 使用 「file://路徑」 的URI方式實現對本地資源和內部文件的請求/響應、FtpWebRequest 使用FTP文件傳輸協議實現文件請求/響應、HttpWebRequest 用於處理HTTP的頁面請求/響應。因爲使用方法相相似,下面就以經常使用的HttpWebRequest爲例子介紹一下異步WebRequest的使用方法。
在使用ASP.NET開發網站的時候,每每會忽略了HttpWebRequest的使用,由於開發都假設客戶端是使用瀏覽器等工具去閱讀頁面的。但若是你對REST開發方式有所瞭解,那對 HttpWebRequest 就應該很是熟悉。它能夠在路徑參數、頭文件、頁面主體、Cookie 等多處地方加入請求條件,而後對回覆數據進行適當處理。HttpWebRequest 包含有如下幾個經常使用方法用於處理請求/響應:
public override Stream GetRequestStream ()
public override WebResponse GetResponse ()
public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )
其中BeginGetRequestStream、EndGetRequestStream 用於異步向HttpWebRequest對象寫入請求信息; BeginGetResponse、EndGetResponse 用於異步發送頁面請求並獲取返回信息。使用異步方式操做Internet的「請求/響應」,避免主線程長期處於等待狀態,而操做期間異步線程是來自CLR線程池的I/O線程。
下面以簡單的例子介紹一下異步請求的用法。
首先爲Person類加上可序列化特性,在服務器端創建Hanlder.ashx,經過Request.InputStream 獲取到請求數據並把數據轉化爲String對象,此實例中數據是以 「Id:1」 的形式實現傳送的。而後根據Id查找對應的Person對象,並把Person對象寫入Response.OutStream 中返還到客戶端。
在客戶端先把 HttpWebRequird.Method 設置爲 "post",使用異步方式經過BeginGetRequireStream獲取請求數據流,而後寫入請求數據 「Id:1」。再使用異步方法BeginGetResponse 獲取回覆數據,最後把數據反序列化爲Person對象顯示出來。
注意:HttpWebRequire.Method默認爲get,在寫入請求前必須把HttpWebRequire.Method設置爲post,不然在使用BeginGetRequireStream 獲取請求數據流的時候,系統就會發出 「沒法發送具備此謂詞類型的內容正文" 的異常。
Model
1 namespace Model
2 {
3 [Serializable]
4 public class Person
5 {
6 public int ID
7 {
8 get;
9 set;
10 }
11 public string Name
12 {
13 get;
14 set;
15 }
16 public int Age
17 {
18 get;
19 set;
20 }
21 }
22 }
服務器端
1 public class Handler : IHttpHandler {
2
3 public void ProcessRequest(HttpContext context)
4 {
5 //把信息轉換爲String,找出輸入條件Id
6 byte[] bytes=new byte[1024];
7 int length=context.Request.InputStream.Read(bytes,0,1024);
8 string condition = Encoding.Default.GetString(bytes);
9 int id = int.Parse(condition.Split(new string[] { ":" },
10 StringSplitOptions.RemoveEmptyEntries)[1]);
11
12 //根據Id查找對應Person對象
13 var person = GetPersonList().Where(x => x.ID == id).First();
14
15 //所Person格式化爲二進制數據寫入OutputStream
16 BinaryFormatter formatter = new BinaryFormatter();
17 formatter.Serialize(context.Response.OutputStream, person);
18 }
19
20 //模擬源數據
21 private IList<Person> GetPersonList()
22 {
23 var personList = new List<Person>();
24
25 var person1 = new Person();
26 person1.ID = 1;
27 person1.Name = "Leslie";
28 person1.Age = 30;
29 personList.Add(person1);
30 ...........
31 return personList;
32 }
33
34 public bool IsReusable
35 {
36 get { return true;}
37 }
38 }
客戶端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Request();
7 Console.ReadKey();
8 }
9
10 static void Request()
11 {
12 ThreadPoolMessage("Start");
13 //使用WebRequest.Create方法創建HttpWebRequest對象
14 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(
15 "http://localhost:5700/Handler.ashx");
16 webRequest.Method = "post";
17
18 //對寫入數據的RequestStream對象進行異步請求
19 IAsyncResult result=webRequest.BeginGetRequestStream(
20 new AsyncCallback(EndGetRequestStream),webRequest);
21 }
22
23 static void EndGetRequestStream(IAsyncResult result)
24 {
25 ThreadPoolMessage("RequestStream Complete");
26 //獲取RequestStream
27 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
28 Stream stream=webRequest.EndGetRequestStream(result);
29
30 //寫入請求條件
31 byte[] condition = Encoding.Default.GetBytes("Id:1");
32 stream.Write(condition, 0, condition.Length);
33
34 //異步接收回傳信息
35 IAsyncResult responseResult = webRequest.BeginGetResponse(
36 new AsyncCallback(EndGetResponse), webRequest);
37 }
38
39 static void EndGetResponse(IAsyncResult result)
40 {
41 //顯出線程池現狀
42 ThreadPoolMessage("GetResponse Complete");
43
44 //結束異步請求,獲取結果
45 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
46 WebResponse webResponse = webRequest.EndGetResponse(result);
47
48 //把輸出結果轉化爲Person對象
49 Stream stream = webResponse.GetResponseStream();
50 BinaryFormatter formatter = new BinaryFormatter();
51 var person=(Person)formatter.Deserialize(stream);
52 Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}",
53 person.ID, person.Name, person.Age));
54 }
55
56 //顯示線程池現狀
57 static void ThreadPoolMessage(string data)
58 {
59 int a, b;
60 ThreadPool.GetAvailableThreads(out a, out b);
61 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
62 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
63 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
64
65 Console.WriteLine(message);
66 }
67 }
從運行結果能夠看到,BeginGetRequireStream、BeginGetResponse方法是使用CLR線程池的I/O線程。
5.4 異步調用WebService
相比TCP/IP套接字,在使用WebService的時候,服務器端須要更復雜的操做處理,使用時間每每會更長。爲了不客戶端長期處於等待狀態,在配置服務引用時選擇 「生成異步操做」,系統能夠自動創建異步調用的方式。
以.NET 2.0之前,系統都是使用ASMX來設計WebService,而近年來WCF可說是火熱登場,下面就以WCF爲例子簡單介紹一下異步調用WebService的例子。
因爲系統能夠自動生成異步方法,使用起來很是簡單,首先在服務器端創建服務ExampleService,裏面包含方法Method。客戶端引用此服務時,選擇 「生成異步操做」。而後使用 BeginMethod 啓動異步方法, 在回調函數中調用EndMethod結束異步調用。
服務端
1 [ServiceContract]
2 public interface IExampleService
3 {
4 [OperationContract]
5 string Method(string name);
6 }
7
8 public class ExampleService : IExampleService
9 {
10 public string Method(string name)
11 {
12 return "Hello " + name;
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 ServiceHost host = new ServiceHost(typeof(ExampleService));
21 host.Open();
22 Console.ReadKey();
23 host.Close();
24 }
25 }
26
27 <configuration>
28 <system.serviceModel>
29 <services>
30 <service name="Example.ExampleService">
31 <endpoint address="" binding="wsHttpBinding" contract="Example.IExampleService">
32 <identity>
33 <dns value="localhost" />
34 </identity>
35 </endpoint>
36 <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
37 <host>
38 <baseAddresses>
39 <add baseAddress="http://localhost:7200/Example/ExampleService/" />
40 </baseAddresses>
41 </host>
42 </service>
43 </services>
44 </system.serviceModel>
45 </configuration>
客戶端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //創建服務對象,異步調用服務方法
10 ExampleServiceReference.ExampleServiceClient exampleService = new
11 ExampleServiceReference.ExampleServiceClient();
12 exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod),
13 exampleService);
14 Console.ReadKey();
15 }
16
17 static void AsyncCallbackMethod(IAsyncResult result)
18 {
19 Thread.Sleep(1000);
20 ThreadPoolMessage("Complete");
21 ExampleServiceReference.ExampleServiceClient example =
22 (ExampleServiceReference.ExampleServiceClient)result.AsyncState;
23 string data=example.EndMethod(result);
24 Console.WriteLine(data);
25 }
26
27 //顯示線程池現狀
28 static void ThreadPoolMessage(string data)
29 {
30 int a, b;
31 ThreadPool.GetAvailableThreads(out a, out b);
32 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
33 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
34 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
35
36 Console.WriteLine(message);
37 }
38 }
39
40 <configuration>
41 <system.serviceModel>
42 <bindings>
43 <wsHttpBinding>
44 <binding name="WSHttpBinding_IExampleService" closeTimeout="00:01:00"
45 openTimeout="00:01:00" receiveTimeout="00:10:00" sendTimeout="00:01:00"
46 bypassProxyOnLocal="false" transactionFlow="false"
47 hostNameComparisonMode="StrongWildcard" maxBufferPoolSize="524288"
48 maxReceivedMessageSize="65536" messageEncoding="Text" textEncoding="utf-8"
49 useDefaultWebProxy="true" allowCookies="false">
50 <readerQuotas maxDepth="32" maxStringContentLength="8192" maxArrayLength="16384"
51 maxBytesPerRead="4096" maxNameTableCharCount="16384" />
52 <reliableSession ordered="true" inactivityTimeout="00:10:00" enabled="false" />
53 <security mode="Message">
54 <transport clientCredentialType="Windows" proxyCredentialType="None"
55 realm="" />
56 <message clientCredentialType="Windows" negotiateServiceCredential="true"
57 algorithmSuite="Default" />
58 </security>
59 </binding>
60 </wsHttpBinding>
61 </bindings>
62 <client>
63 <endpoint address="http://localhost:7200/Example/ExampleService/"
64 binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_IExampleService"
65 contract="ExampleServiceReference.IExampleService"
66 name="WSHttpBinding_IExampleService">
67 <identity>
68 <dns value="localhost" />
69 </identity>
70 </endpoint>
71 </client>
72 </system.serviceModel>
73 </configuration>
注意觀察運行結果,異步調用服務時,回調函數都是運行於CLR線程池的I/O線程當中。
6、異步 SqlCommand
從ADO.NET 2.0開始,SqlCommand就新增了幾個異步方法執行SQL命令。相對於同步執行方式,它使主線程不須要等待數據庫的返回結果,在使用複雜性查詢或批量插入時將有效提升主線程的效率。使用異步SqlCommand的時候,請注意把ConnectionString 的 Asynchronous Processing 設置爲 true 。
注意:SqlCommand異步操做的特別之處在於線程並不依賴於CLR線程池,而是由Windows內部提供,這比使用異步委託更有效率。但若是須要使用回調函數的時候,回調函數的線程依然是來自於CLR線程池的工做者線程。
SqlCommand有如下幾個方法支持異步操做:
public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)
public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)
public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)
因爲使用方式類似,此處就以 BeginExecuteNonQuery 爲例子,介紹一下異步SqlCommand的使用。首先創建connectionString,注意把Asynchronous Processing設置爲true來啓動異步命令,而後把SqlCommand.CommandText設置爲 WAITFOR DELAY "0:0:3" 來虛擬數據庫操做。再經過BeginExecuteNonQuery啓動異步操做,利用輪詢方式監測操做狀況。最後在操做完成後使用EndExecuteNonQuery完成異步操做。
1 class Program
2 {
3 //把Asynchronous Processing設置爲true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;「+
5 "Integrated Security=True;Asynchronous Processing=true";
6
7 static void Main(string[] args)
8 {
9 //把CLR線程池最大線程數設置爲1000
10 ThreadPool.SetMaxThreads(1000, 1000);
11 ThreadPoolMessage("Start");
12
13 //使用WAITFOR DELAY命令來虛擬操做
14 SqlConnection connection = new SqlConnection(connectionString);
15 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
16 connection.Open();
17
18 //啓動異步SqlCommand操做,利用輪詢方式監測操做
19 IAsyncResult result = command.BeginExecuteNonQuery();
20 ThreadPoolMessage("BeginRead");
21 while (!result.AsyncWaitHandle.WaitOne(500))
22 Console.WriteLine("Main thread do work........");
23
24 //結束異步SqlCommand
25 int count= command.EndExecuteNonQuery(result);
26 ThreadPoolMessage("\nCompleted");
27 Console.ReadKey();
28 }
29
30 //顯示線程池現狀
31 static void ThreadPoolMessage(string data)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
36 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
37 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
38 Console.WriteLine(message);
39 }
40 }
注意運行結果,SqlCommand的異步執行線程並不屬於CLR線程池。
若是以爲使用輪詢方式過於麻煩,可使用回調函數,但要注意當調用回調函數時,線程是來自於CLR線程池的工做者線程。
1 class Program
2 {
3 //把Asynchronous Processing設置爲true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;」+
5 「Integrated Security=True;Asynchronous Processing=true";
6 static void Main(string[] args)
7 {
8 //把CLR線程池最大線程數設置爲1000
9 ThreadPool.SetMaxThreads(1000, 1000);
10 ThreadPoolMessage("Start");
11
12 //使用WAITFOR DELAY命令來虛擬操做
13 SqlConnection connection = new SqlConnection(connectionString);
14 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
15 connection.Open();
16
17 //啓動異步SqlCommand操做,並把SqlCommand對象傳遞到回調函數
18 IAsyncResult result = command.BeginExecuteNonQuery(
19 new AsyncCallback(AsyncCallbackMethod),command);
20 Console.ReadKey();
21 }
22
23 static void AsyncCallbackMethod(IAsyncResult result)
24 {
25 Thread.Sleep(200);
26 ThreadPoolMessage("AsyncCallback");
27 SqlCommand command = (SqlCommand)result.AsyncState;
28 int count=command.EndExecuteNonQuery(result);
29 command.Connection.Close();
30 }
31
32 //顯示線程池現狀
33 static void ThreadPoolMessage(string data)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
38 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
39 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
運行結果:
7、並行編程與PLINQ
要使用多線程開發,必須很是熟悉Thread的使用,並且在開發過程當中可能會面對不少未知的問題。爲了簡化開發,.NET 4.0 特別提供一個並行編程庫System.Threading.Tasks,它能夠簡化並行開發,你無需直接跟線程或線程池打交道,就能夠簡單創建多線程應用程序。此外,.NET還提供了新的一組擴展方法PLINQ,它具備自動分析查詢功能,若是並行查詢能提升系統效率,則同時運行,若是查詢未能從並行查詢中受益,則按原順序查詢。下面將詳細介紹並行操做的方式。
7.1 泛型委託
使用並行編程能夠同時操做多個委託,在介紹並行編程前先簡單介紹一下兩個泛型委託System.Func<>與System.Action<>。
Func<>是一個能接受多個參數和一個返回值的泛型委託,它能接受0個到16個輸入參數, 其中 T1,T2,T3,T4......T16 表明自定的輸入類型,TResult爲自定義的返回值。
public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
..............
public delegate TResult Func<T1,T2, T3, ,T4, ...... ,T16,TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
Action<>與Func<>十分類似,不一樣在於Action<>的返回值爲void,Action能接受0~16個參數
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
7.2 任務並行庫(TPL)
System.Threading.Tasks中的類被統稱爲任務並行庫(Task Parallel Library,TPL),TPL使用CLR線程池把工做分配到CPU,並能自動處理工做分區、線程調度、取消支持、狀態管理以及其餘低級別的細節操做,極大地簡化了多線程的開發。
TPL包括經常使用的數據並行與任務並行兩種執行方式:
7.2.1 數據並行
數據並行的核心類就是System.Threading.Tasks.Parallel,它包含兩個靜態方法 Parallel.For 與 Parallel.ForEach, 使用方式與for、foreach相仿。經過這兩個方法能夠並行處理System.Func<>、System.Action<>委託。
如下一個例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法對List<Person>進行並行查詢。
假設使用單線程方式查詢3個Person對象,須要用時大約6秒,在使用並行方式,只需使用2秒就能完成查詢,並且可以避開Thread的繁瑣處理。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //並行查詢
8 Parallel.For(0, 3,n =>
9 {
10 Thread.Sleep(2000); //模擬查詢
11 ThreadPoolMessage(GetPersonList()[n]);
12 });
13 Console.ReadKey();
14 }
15
16 //模擬源數據
17 static IList<Person> GetPersonList()
18 {
19 var personList = new List<Person>();
20
21 var person1 = new Person();
22 person1.ID = 1;
23 person1.Name = "Leslie";
24 person1.Age = 30;
25 personList.Add(person1);
26 ...........
27 return personList;
28 }
29
30 //顯示線程池現狀
31 static void ThreadPoolMessage(Person person)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
37 " CompletionPortThreads is :{5}\n",
38 person.ID, person.Name, person.Age,
39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
觀察運行結果,對象並不是按照原排列順序進行查詢,而是使用並行方式查詢。
若想中止操做,能夠利用ParallelLoopState參數,下面以ForEach做爲例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source爲數據集,在Action<TSource,ParallelLoopState>委託的ParallelLoopState參數當中包含有Break()和 Stop()兩個方法均可以使迭代中止。Break的使用跟傳統for裏面的使用方式類似,但由於處於並行處理當中,使用Break並不能保證全部運行能當即中止,在當前迭代以前的迭代會繼續執行。若想當即中止操做,可使用Stop方法,它能保證當即終止全部的操做,不管它們是處於當前迭代的以前仍是以後。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //並行查詢
9 Parallel.ForEach(GetPersonList(), (person, state) =>
10 {
11 if (person.ID == 2)
12 state.Stop();
13 ThreadPoolMessage(person);
14 });
15 Console.ReadKey();
16 }
17
18 //模擬源數據
19 static IList<Person> GetPersonList()
20 {
21 var personList = new List<Person>();
22
23 var person1 = new Person();
24 person1.ID = 1;
25 person1.Name = "Leslie";
26 person1.Age = 30;
27 personList.Add(person1);
28 ..........
29 return personList;
30 }
31
32 //顯示線程池現狀
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42
43 Console.WriteLine(message);
44 }
45 }
觀察運行結果,當Person的ID等於2時,運行將會中止。
當要在多個線程中調用本地變量,可使用如下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一個參數爲數據集;
第二個參數是一個Func委託,用於在每一個線程執行前進行初始化;
第 三個參數是委託Func<Of T1,T2,T3,TResult>,它能對數據集的每一個成員進行迭代,當中T1是數據集的成員,T2是一個ParallelLoopState對 象,它能夠控制迭代的狀態,T3是線程中的本地變量;
第四個參數是一個Action委託,用於對每一個線程的最終狀態進行最終操做。
在如下例子中,使用ForEach計算多個Order的整體價格。在ForEach方法中,首先把參數初始化爲0f,而後用把同一個Order的多個OrderItem價格進行累加,計算出Order的價格,最後把多個Order的價格進行累加,計算出多個Order的整體價格。
1 public class Order
2 {
3 public int ID;
4 public float Price;
5 }
6
7 public class OrderItem
8 {
9 public int ID;
10 public string Goods;
11 public int OrderID;
12 public float Price;
13 public int Count;
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 //設置最大線程數
21 ThreadPool.SetMaxThreads(1000, 1000);
22 float totalPrice = 0f;
23 //並行查詢
24 var parallelResult = Parallel.ForEach(GetOrderList(),
25 () => 0f, //把參數初始值設爲0
26 (order, state, orderPrice) =>
27 {
28 //計算單個Order的價格
29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)
30 .Sum(item => item.Price * item.Count);
31 order.Price = orderPrice;
32 ThreadPoolMessage(order);
33
34 return orderPrice;
35 },
36 (finallyPrice) =>
37 {
38 totalPrice += finallyPrice;//計算多個Order的整體價格
39 }
40 );
41
42 while (!parallelResult.IsCompleted)
43 Console.WriteLine("Doing Work!");
44
45 Console.WriteLine("Total Price is:" + totalPrice);
46 Console.ReadKey();
47 }
48 //虛擬數據
49 static IList<Order> GetOrderList()
50 {
51 IList<Order> orderList = new List<Order>();
52 Order order1 = new Order();
53 order1.ID = 1;
54 orderList.Add(order1);
55 ............
56 return orderList;
57 }
58 //虛擬數據
59 static IList<OrderItem> GetOrderItem()
60 {
61 IList<OrderItem> itemList = new List<OrderItem>();
62
63 OrderItem orderItem1 = new OrderItem();
64 orderItem1.ID = 1;
65 orderItem1.Goods = "iPhone 4S";
66 orderItem1.Price = 6700;
67 orderItem1.Count = 2;
68 orderItem1.OrderID = 1;
69 itemList.Add(orderItem1);
70 ...........
71 return itemList;
72 }
73
74 //顯示線程池現狀
75 static void ThreadPoolMessage(Order order)
76 {
77 int a, b;
78 ThreadPool.GetAvailableThreads(out a, out b);
79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" +
80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" +
81 " CompletionPortThreads is:{4}\n",
82 order.ID, order.Price,
83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
84
85 Console.WriteLine(message);
86 }
87 }
運行結果
7.2.2 任務並行
在TPL當中還可使用Parallel.Invoke方法觸發多個異步任務,其中 actions 中能夠包含多個方法或者委託,parallelOptions用於配置Parallel類的操做。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke並行查詢多個Person,actions當中能夠綁定方法、lambda表達式或者委託,注意綁定方法時必須是返回值爲void的無參數方法。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //任務並行
9 Parallel.Invoke(option,
10 PersonMessage,
11 ()=>ThreadPoolMessage(GetPersonList()[1]),
12 delegate(){
13 ThreadPoolMessage(GetPersonList()[2]);
14 });
15 Console.ReadKey();
16 }
17
18 static void PersonMessage()
19 {
20 ThreadPoolMessage(GetPersonList()[0]);
21 }
22
23 //顯示線程池現狀
24 static void ThreadPoolMessage(Person person)
25 {
26 int a, b;
27 ThreadPool.GetAvailableThreads(out a, out b);
28 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
29 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
30 " CompletionPortThreads is :{5}\n",
31 person.ID, person.Name, person.Age,
32 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
33
34 Console.WriteLine(message);
35 }
36
37 //模擬源數據
38 static IList<Person> GetPersonList()
39 {
40 var personList = new List<Person>();
41
42 var person1 = new Person();
43 person1.ID = 1;
44 person1.Name = "Leslie";
45 person1.Age = 30;
46 personList.Add(person1);
47 ..........
48 return personList;
49 }
50 }
運行結果
7.3 Task簡介
以Thread建立的線程被默認爲前臺線程,固然你能夠把線程IsBackground屬性設置爲true,但TPL爲此提供了一個更簡單的類Task。
Task存在於System.Threading.Tasks命名空間當中,它能夠做爲異步委託的簡單替代品。
經過Task的Factory屬性將返回TaskFactory類,以TaskFactory.StartNew(Action)方法能夠建立一個新線程,所建立的線程默認爲後臺線程。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Task.Factory.StartNew(() => ThreadPoolMessage());
7 Console.ReadKey();
8 }
9
10 //顯示線程池現狀
11 static void ThreadPoolMessage()
12 {
13 int a, b;
14 ThreadPool.GetAvailableThreads(out a, out b);
15 string message = string.Format("CurrentThreadId is:{0}\n" +
16 "CurrentThread IsBackground:{1}\n" +
17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n",
18 Thread.CurrentThread.ManagedThreadId,
19 Thread.CurrentThread.IsBackground.ToString(),
20 a.ToString(), b.ToString());
21 Console.WriteLine(message);
22 }
23 }
運行結果
若要取消處理,能夠利用CancellationTakenSource對象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )
在方法中加入CancellationTakenSource對象的CancellationToken屬性,能夠控制任務的運行,調用CancellationTakenSource.Cancel時任務就會自動中止。下面以圖片下載爲例子介紹一下TaskFactory的使用。
服務器端頁面
1 <html xmlns="http://www.w3.org/1999/xhtml">
2 <head runat="server">
3 <title></title>
4 <script type="text/C#" runat="server">
5 private static List<string> url=new List<string>();
6
7 protected void Page_Load(object sender, EventArgs e)
8 {
9 if (!Page.IsPostBack)
10 {
11 url.Clear();
12 Application["Url"] = null;
13 }
14 }
15
16 protected void CheckBox_CheckedChanged(object sender, EventArgs e)
17 {
18 CheckBox checkBox = (CheckBox)sender;
19 if (checkBox.Checked)
20 url.Add(checkBox.Text);
21 else
22 url.Remove(checkBox.Text);
23 Application["Url"]= url;
24 }
25 </script>
26 </head>
27 <body>
28 <form id="form1" runat="server" >
29 <div align="left">
30 <div align="center" style="float: left;">
31 <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br />
32 <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
33 oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" />
34 </div>
35 <div align="center" style="float: left">
36 <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br />
37 <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
38 oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" />
39 </div>
40 <div align="center" style="float: left">
41 <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br />
42 <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
43 oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" />
44 </div>
45 <div align="center" style="float: left">
46 <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br />
47 <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
48 oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" />
49 </div>
50 <div align="center" style="float: left">
51 <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br />
52 <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
53 oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" />
54 </div>
55 </div>
56 </form>
57 </body>
58 </html>
首先在服務器頁面中顯示多個*.jpg圖片,每一個圖片都有對應的CheckBox檢測其選擇狀況。
所選擇圖片的路徑會記錄在Application["Url"]當中傳遞到Handler.ashx當中。
Handler.ashx 處理圖片的下載,它從 Application["Url"] 當中獲取所選擇圖片的路徑,並把圖片轉化成byte[]二進制數據。
再把圖片的數量,每副圖片的二進制數據的長度記錄在OutputStream的頭部。
最後把圖片的二進制數據記入 OutputStream 一併輸出。
1 public class Handler : IHttpHandler
2 {
3 public void ProcessRequest(HttpContext context)
4 {
5 //獲取圖片名,把圖片數量寫OutputStream
6 List<String> urlList = (List<string>)context.Application["Url"];
7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4);
8
9 //把圖片轉換成二進制數據
10 List<string> imageList = GetImages(urlList);
11
12 //把每副圖片長度寫入OutputStream
13 foreach (string image in imageList)
14 {
15 byte[] imageByte=Convert.FromBase64String(image);
16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);
17 }
18
19 //把圖片寫入OutputStream
20 foreach (string image in imageList)
21 {
22 byte[] imageByte = Convert.FromBase64String(image);
23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length);
24 }
25 }
26
27 //獲取多個圖片的二進制數據
28 private List<string> GetImages(List<string> urlList)
29 {
30 List<string> imageList = new List<string>();
31 foreach (string url in urlList)
32 imageList.Add(GetImage(url));
33 return imageList;
34 }
35
36 //獲取單副圖片的二進制數據
37 private string GetImage(string url)
38 {
39 string path = "E:/My Projects/Example/WebSite/Images/"+url;
40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);
41 byte[] imgBytes = new byte[10240];
42 int imgLength = stream.Read(imgBytes, 0, 10240);
43 return Convert.ToBase64String(imgBytes,0,imgLength);
44 }
45
46 public bool IsReusable
47 {
48 get{ return false;}
49 }
50 }
客戶端
創建一個WinForm窗口,裏面加入一個WebBrowser鏈接到服務器端的Default.aspx頁面。
當按下Download按鍵時,系統就會利用TaskFactory.StartNew的方法創建異步線程,使用WebRequest方法向Handler.ashx發送請求。
接收到回傳流時,就會根據頭文件的內容判斷圖片的數量與每副圖片的長度,把二進制數據轉化爲*.jpg文件保存。
系統利用TaskFactory.StartNew(action,cancellationToken) 方式異步調用GetImages方法進行圖片下載。
當用戶按下Cancel按鈕時,異步任務就會中止。值得注意的是,在圖片下載時調用了CancellationToken.ThrowIfCancellationRequested方法,目的在檢查並行任務的運行狀況,在並行任務被中止時釋放出OperationCanceledException異常,確保用戶按下Cancel按鈕時,中止全部並行任務。
1 public partial class Form1 : Form
2 {
3 private CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 public Form1()
6 {
7 InitializeComponent();
8 ThreadPool.SetMaxThreads(1000, 1000);
9 }
10
11 private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
12 {
13 Task.Factory.StartNew(GetImages,tokenSource.Token);
14 }
15
16 private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
17 {
18 tokenSource.Cancel();
19 }
20
21 private void GetImages()
22 {
23 //發送請求,獲取輸出流
24 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");
25 Stream responseStream=webRequest.GetResponse().GetResponseStream();
26
27 byte[] responseByte = new byte[81960];
28 IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);
29 int responseLength = responseStream.EndRead(result);
30
31 //獲取圖片數量
32 int imageCount = BitConverter.ToInt32(responseByte, 0);
33
34 //獲取每副圖片的長度
35 int[] lengths = new int[imageCount];
36 for (int n = 0; n < imageCount; n++)
37 {
38 int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);
39 lengths[n] = length;
40 }
41 try
42 {
43 //保存圖片
44 for (int n = 0; n < imageCount; n++)
45 {
46 string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);
47 FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);
48
49 //計算字節偏移量
50 int offset = (imageCount + 1) * 4;
51 for (int a = 0; a < n; a++)
52 offset += lengths[a];
53
54 file.Write(responseByte, offset, lengths[n]);
55 file.Flush();
56
57 //模擬操做
58 Thread.Sleep(1000);
59
60 //檢測CancellationToken變化
61 tokenSource.Token.ThrowIfCancellationRequested();
62 }
63 }
64 catch (OperationCanceledException ex)
65 {
66 MessageBox.Show("Download cancel!");
67 }
68 }
69 }
7.4 並行查詢(PLINQ)
並行 LINQ (PLINQ) 是 LINQ 模式的並行實現,主要區別在於 PLINQ 嘗試充分利用系統中的全部處理器。 它利用全部處理器的方法,把數據源分紅片斷,而後在多個處理器上對單獨工做線程上的每一個片斷並行執行查詢, 在許多狀況下,並行執行意味着查詢運行速度顯著提升。但這並不說明全部PLINQ都會使用並行方式,當系統測試要並行查詢會對系統性能形成損害時,那將自動化地使用同步執行。
在System.Linq.ParallelEnumerable類中,包含了並行查詢的大部分方法。
方法成員 |
說明 |
AsParallel |
PLINQ 的入口點。 指定若是可能,應並行化查詢的其他部分。 |
AsSequential(Of TSource) |
指定查詢的其他部分應像非並行 LINQ 查詢同樣按順序運行。 |
AsOrdered |
指定 PLINQ 應保留查詢的其他部分的源序列排序,直到例如經過使用 orderby(在 Visual Basic 中爲 Order By)子句更改排序爲止。 |
AsUnordered(Of TSource) |
指定查詢的其他部分的 PLINQ 不須要保留源序列的排序。 |
WithCancellation(Of TSource) |
指定 PLINQ 應按期監視請求取消時提供的取消標記和取消執行的狀態。 |
WithDegreeOfParallelism(Of TSource) |
指定 PLINQ 應當用來並行化查詢的處理器的最大數目。 |
WithMergeOptions(Of TSource) |
提供有關 PLINQ 應當如何(若是可能)將並行結果合併回到使用線程上的一個序列的提示。 |
WithExecutionMode(Of TSource) |
指定 PLINQ 應當如何並行化查詢(即便默認行爲是按順序運行查詢)。 |
ForAll(Of TSource) |
多線程枚舉方法,與循環訪問查詢結果不一樣,它容許在不首先合併回到使用者線程的狀況下並行處理結果。 |
Aggregate 重載 |
對於 PLINQ 惟一的重載,它啓用對線程本地分區的中間聚合以及一個用於合併全部分區結果的最終聚合函數。 |
7.4.1 AsParallel
一般想要實現並行查詢,只需向數據源添加 AsParallel 查詢操做便可。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel()
6 .Where(x=>x.Age>30);
7 Console.ReadKey();
8 }
9
10 //模擬源數據
11 static IList<Person> GetPersonList()
12 {
13 var personList = new List<Person>();
14
15 var person1 = new Person();
16 person1.ID = 1;
17 person1.Name = "Leslie";
18 person1.Age = 30;
19 personList.Add(person1);
20 ...........
21 return personList;
22 }
23 }
7.4.2 AsOrdered
若要使查詢結果必須保留源序列排序方式,可使用AsOrdered方法。
AsOrdered依然使用並行方式,只是在查詢過程加入額外信息,在並行結束後把查詢結果再次進行排列。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().AsOrdered()
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {......}
12 }
7.4.3 WithDegreeOfParallelism
默認狀況下,PLINQ 使用主機上的全部處理器,這些處理器的數量最多可達 64 個。
經過使用 WithDegreeOfParallelism(Of TSource) 方法,能夠指示 PLINQ 使用很少於指定數量的處理器。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {.........}
12 }
7.4.4 ForAll
若是要對並行查詢結果進行操做,通常會在for或foreach中執行,執行枚舉操做時會使用同步方式。
有見及此,PLINQ中包含了ForAll方法,它可使用並行方式對數據集進行操做。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 GetPersonList().AsParallel().ForAll(person =>{
7 ThreadPoolMessage(person);
8 });
9 Console.ReadKey();
10 }
11
12 static IList<Person> GetPersonList()
13 {.......}
14
15 //顯示線程池現狀
16 static void ThreadPoolMessage(Person person)
17 {
18 int a, b;
19 ThreadPool.GetAvailableThreads(out a, out b);
20 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
21 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
22 " CompletionPortThreads is :{5}\n",
23 person.ID, person.Name, person.Age,
24 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
25 Console.WriteLine(message);
26 }
27 }
運行結果
7.4.5 WithCancellation
若是須要中止查詢,可使用 WithCancellation(Of TSource) 運算符並提供 CancellationToken 實例做爲參數。
與第三節Task的例子類似,若是標記上的 IsCancellationRequested 屬性設置爲 true,則 PLINQ 將會注意到它,並中止全部線程上的處理,而後引起 OperationCanceledException。這能夠保證並行查詢可以當即中止。
1 class Program
2 {
3 static CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 static void Main(string[] args)
6 {
7 Task.Factory.StartNew(Cancel);
8 try
9 {
10 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
11 .ForAll(person =>
12 {
13 ThreadPoolMessage(person);
14 });
15 }
16 catch (OperationCanceledException ex)
17 { }
18 Console.ReadKey();
19 }
20
21 //在10~50毫秒內發出中止信號
22 static void Cancel()
23 {
24 Random random = new Random();
25 Thread.Sleep(random.Next(10,50));
26 tokenSource.Cancel();
27 }
28
29 static IList<Person> GetPersonList()
30 {......}
31
32 //顯示線程池現狀
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
45
8、定時器與鎖
8.1定時器
若要長期定時進行一些工做,好比像郵箱更新,實時收聽信息等等,能夠利用定時器Timer進行操做。
在System.Threading命名空間中存在Timer類與對應的TimerCallback委託,它能夠在後臺線程中執行一些長期的定時操做,使主線程不受干擾。
Timer類中最經常使用的構造函數爲 public Timer( timerCallback , object , int , int )
timerCallback委託能夠綁定執行方法,執行方法必須返回void,它能夠是無參數方法,也能夠帶一個object參數的方法。
第二個參數是爲 timerCallback 委託輸入的參數對象。
第三個參數是開始執行前等待的時間。
第四個參數是每次執行之間的等待時間。
開發實例
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6
7 TimerCallback callback = new TimerCallback(ThreadPoolMessage);
8 Timer t = new Timer(callback,"Hello Jack! ", 0, 1000);
9 Console.ReadKey();
10 }
11
12 //顯示線程池現狀
13 static void ThreadPoolMessage(object data)
14 {
15 int a, b;
16 ThreadPool.GetAvailableThreads(out a, out b);
17 string message = string.Format("{0}\n CurrentThreadId is:{1}\n" +
18 " CurrentThread IsBackground:{2}\n" +
19 " WorkerThreads is:{3}\n CompletionPortThreads is:{4}\n",
20 data + "Time now is " + DateTime.Now.ToLongTimeString(),
21 Thread.CurrentThread.ManagedThreadId,
22 Thread.CurrentThread.IsBackground.ToString(),
23 a.ToString(), b.ToString());
24 Console.WriteLine(message);
25 }
26 }
注意觀察運行結果,每次調用Timer綁定的方法時不必定是使用同一線程,但線程都會是來自工做者線程的後臺線程。
8.2 鎖
在使用多線程開發時,存在必定的共用數據,爲了不多線程同時操做同一數據,.NET提供了lock、Monitor、Interlocked等多個鎖定數據的方式。
8.2.1 lock
lock的使用比較簡單,若是須要鎖定某個對象時,能夠直接使用lock(this)的方式。
1 private void Method()
2 {
3 lock(this)
4 {
5 //在此進行的操做能保證在同一時間內只有一個線程對此對象操做
6 }
7 }
若是操做只鎖定某段代碼,能夠事先創建一個object對象,並對此對象進行操做鎖定,這也是.net提倡的鎖定用法。
1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 lock(obj)
8 {.......}
9 }
10 }
8.2.2 Montior
Montior存在於System.Thread命名空間內,相比lock,Montior使用更靈活。
它存在 Enter, Exit 兩個方法,它能夠對對象進行鎖定與解鎖,比lock使用更靈活。
1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 Monitor.Enter(obj);
8 try
9 {......}
10 catch(Excetion ex)
11 {......}
12 finally
13 {
14 Monitor.Exit(obj);
15 }
16 }
17 }
18
使用try的方式,能確保程序不會因死鎖而釋放出異常!
並且在finally中釋放obj對象可以確保不管是否出現死鎖狀態,系統都會釋放obj對象。
並且Monitor中還存在Wait方法可讓線程等待一段時間,而後在完成時使用Pulse、PulseAll等方法通知等待線程。
8.2.3 Interlocked
Interlocked存在於System.Thread命名空間內,它的操做比Monitor使用更簡單。
它存在CompareExchange、Decrement、Exchange、Increment等經常使用方法讓參數在安全的狀況進行數據交換。
Increment、Decrement 可使參數安全地加1或減1並返回遞增後的新值。
1 class Example
2 {
3 private int a=1;
4
5 public void AddOne()
6 {
7 int newA=Interlocked.Increment(ref a);
8 }
9 }
Exchange能夠安全地變量賦值。
1 public void SetData()
2 {
3 Interlocked.Exchange(ref a,100);
4 }
CompareExchange使用特別方便,它至關於if的用法,當a等於1時,則把100賦值給a。
1 public void CompareAndExchange()2 {3 Interlocked.CompareExchange(ref a,100,1);4 }