通常來講若是計算機的 CPU 利用率沒有 100% ,那麼說明不少進程的部分線程沒有運行。可能在等待 文件/網絡/數據庫等設備讀取或者寫入數據,又多是等待按鍵、鼠標移動等事件。算法
執行 I/O 限制的操做時,操做系統經過設備驅動程序通知硬件幹活,而 CPU 處於一種空閒狀態。而在現代應用程序當中,使用線程池來執行計算限制的操做,而不是手動建立線程。數據庫
每一個 CLR 都有本身獨立的線程池,而且由各自 CLR 控制的全部 AppDomain 所共享。安全
線程池自己維護了一個請求隊列,當程序須要執行一個異步操做的時候,會將一個記錄項追加到隊列之中,而後由線程池將該記錄項分派給線程池線程,若是沒有線程則建立一個新的線程。線程任務處理完整以後,將該線程放入線程池中等待之後進行復用。網絡
線程池自己是啓發式的,結合程序負載,他會本身根據當前線程池內線程的狀態銷燬/新增線程。數據結構
經過 ThreadPool
靜態類,咱們能夠方便地使用線程池中的線程爲咱們執行一些計算限制的異步操做。只須要調用 ThreadPool
的 QueueUserWorkItem(WaitCallBack callback)
方法,或者是他的另外一個重載方法,接收一個 state 值做爲參數。多線程
他的兩個方法都是非阻塞的,調用以後會當即返回。異步
WaitCallBack
的方法簽名以下:函數
delegate void WaitCallBack(Object state);
在 CLR 的線程池中,將 callback 委託做爲工做項添加到隊列當中,而後由線程池分發線程進行處理。oop
【注意】性能
一旦回調方法拋出了未處理的異常,CLR 會當即終止進程。
每一個線程都有一個執行上下文的數據結構,包含由安全設置,宿主設置和邏輯調用上下文數據(AsyncLocal 與 CallContext)。
當在某個線程(例如主線程)使用了另一個線程(輔助線程),就會產生執行上下文由調用者線程流向被調用線程。這會對性能形成必定的影響,這是由於執行上下文包含有大量地信息。而若是輔助線程又調用了更多的輔助線程,這個時候執行上下問的複製開銷就很是大。
咱們能夠經過 ExecutionContext
類控制線程的執行上下文是否流向輔助線程,只有輔助線程不須要訪問執行上下文時能夠阻止執行上下文流動。當阻止了執行上下文流動時,輔助線程會使用最後一次與其關聯的任意執行上下文,這個時候對於安全設置等就不可信,不該執行任何依賴於執行上下文的操做。
通常來講在主線程,能夠經過 ExecutionContext.SuppressFlow();
方法阻止執行上下文流動,而後再經過 ExecutionContext.RestoreFlow();
恢復流動。
.NET 提供了標準的取消操做模式,這個模式是協做式的,也就是你要取消的操做必須顯式聲明本身能夠被取消。這是由於用戶在執行某些長耗時的計算限制操做的時候,可能會由於等待時間太長或者其餘緣由須要取消這個操做。
首先咱們經過 System.Threading.CancellationTokenSource
對象管理或者取消對象狀態,使用時直接 new 一個便可,而該對象擁有一個 CancellationToken
對象。
這個 Token 對象用於傳遞給執行計算限制操做的方法,經過該 Token 的 IsCancellationRequested
屬性你能夠在方法內部確認任務是否被取消,若是被取消你就能夠進行返回操做。
例子以下:
static void Main(string[] args) { var tokenSource = new CancellationTokenSource(); ThreadPool.QueueUserWorkItem(z => Calculate(CancellationToken.None, 10000)); Console.ReadKey(); tokenSource.Cancel(); Console.ReadLine(); } private static void Calculate(CancellationToken token, int count) { for (int i = 0; i < count; i++) { if (token.IsCancellationRequested) { Console.WriteLine("用戶提早終止操做,退出線程.."); break; } Console.WriteLine(count); Thread.Sleep(200); } Console.WriteLine("計數完成."); }
【注意】
若是你要執行一個不容許被取消的操做,能夠爲方法傳遞一個
CancellationToken.None
對象,由於該對象沒有 Source 源,則不可能會被調用Cancel()
進行取消。
CancellationToken
容許咱們經過 Register()
方法註冊多個委託,這些被註冊了的委託會在 TokenSource 調用 Cancel
取消的時候優先調用,其調用的前後順序爲註冊時的順序。
【注意】
調用
Register()
方法的時候,他有兩個bool
類型的參數,分別是useSyncContext
與useExecutionContext
。這兩個參數用於指定,是否要用調用線程的同步上下文或者執行上下文來調用回調函數。
同時在註冊成功以後會返回一個 CancellationTokenRegistration
對象,經過調用該對象的 Dispose
方法能夠刪除已經註冊的委託回調,這樣在取消時就不會調用該回調。
能夠經過 CancellationTokenSource.CreateLinkedTokenSource()
連接兩個或多個對象,連接成功後會返回一個單獨的 TokenSource 對象。
一旦這個新對象連接的任何一個 TokenSource 對象被取消的時候,該對象也會被取消掉。
在調用 TokenSource 的 Cancel()
方法時(默認爲 false),該方法還有另一個重載傳入 bool
值,若是爲 true 的時候,有多個註冊的回調委託,一旦某個出現異常直接會被拋出該異常,不會等待其餘回調執行完畢。
若是爲 false,則會等到全部回調方法執行完成時,拋出一個 AggregateException
異常,內部的 InnerExceptions
包含有全部在執行過程當中產生的異常信息集合。
除了直接調用 Cancel()
當即取消操做以外,還有一個延遲取消的方法 CancelAfter()
,經過傳遞具體的延遲時間,咱們能夠在指定的之間以後取消某個任務。(PS:有點像 Polly 的 TimeOut )
爲啥使用任務,雖然經過 ThreadPool
能夠很方便地發起一次計算限制的操做,可是你不知道你的方法啥時候執行完成,也沒法在操做完成以後得到返回值。
使用任務執行一個計算限制操做有兩種方式,二者也同樣的能夠傳遞 CancellationToken
進行取消操做:
new Task(Sum,20).Start();
Task.Run(()=>Sum(20));
除此以外還能夠在構造 Task
時 傳遞一些標誌位,用於任務調度器進行一些特殊處理。
任務除了標準的無返回值的 Task
類型以外,還有一個包含有泛型參數的 Task<TResult>
類型,其中 TResult 參數就是任務的返回值類型。
在建立好 Task<TResult>
對象以後,能夠經過 Task.Wait()
等待任務執行完成,Task 的 Wait()
方法會阻塞調用者線程直到任務執行完成。執行完成以後,能夠經過 Task.Reuslt
獲取任務執行以後的返回值。
PS:
這裏獲取
Result
屬性值時,其內部也會調用Wait()
方法。
若是該 Task 內的計算限制操做拋出了未經處理的異常,這個異常會被吞噬掉,調用 Wait()
方法或者使用 Result
屬性的時候,這些異常信息會被包裹在 AggregateException
內部並返回給調用者線程。
【注意】
不推薦直接調用
Wait()
,若是 Task 已經開始執行,該方法會阻塞調用者線程,直到執行完成。第二種狀況是任務尚未開始執行的時候,調用者線程不會被阻塞,Task 當即執行並返回。而調度器可能會使用調用者線程來執行這些操做,這個時候,若是調用者線程獲取了一個線程同步鎖,而 Task 由於也是在調用者線程執行,嘗試獲取鎖的時候,就會產生死鎖。
AggregateException
可能會包含有多個異常,這個時候可使用該對象的 Handle(Func<Exception,bool> predicate)
方法來爲每個異常進行處理,處理返回 true,未處理返回 false。
在調用了 Handle 方法以後,仍然有異常沒有處理,這些沒有處理的異常又會形成一個新的 AggregateException
被拋出。
【注意】
若是不知道有哪些 Task 內部未處理的異常,能夠經過象任務調度器的
UnobservedTaskException
事件登記一個回調方法,若是存在一個沒有處理到的異常,則會調用你的回調方法,並將異常傳遞給你。
除了 Task.Wait()
方法,還有等待一組任務的 Task.WaitAny()
和 Task.WaitAll()
。幾個方法都會阻塞調用者線程,前者當傳遞的一組任務有任意一個完成則當即返回該任務的索引,後者則是要等待這一組任務所有完成以後纔會喚醒調用線程。
這兩個方法一旦被取消,都會拋出 OperationCanceledException
異常。
能夠經過一個 CancellationTokenSource
來取消 Task,同樣的須要傳入的計算限制方法添加 CancellationToken
參數。
只不過呢,在 Task 任務內部咱們不經過 IsCancellationRequested
來判斷任務是否取消,而是經過調用 Token 的 ThrowIfCancellationRequested()
方法來拋出異常。
該方法會判斷當前任務是否被取消,若是被取消了,則拋出異常。這是由於與直接經過線程池添加任務不一樣,線程池沒法知道任務什麼時候完成,而任務則能夠表示是否完成,而且還能返回值。
以前說過經過調用 Task.Wait()
或者在任務還沒有完成的時候調用 Task.Result
屬性,都會形成線程池建立新的線程。而咱們能夠經過在任務完成以後,當即開啓一個新的任務,這樣咱們就能夠經過新的任務知道前一個任務是否已經完成了。
建立一個的計算限制任務對象,咱們在啓動了該任務對象以後,調用 Task.ContinueWith()
方法來建立一個延續任務,新的延續性任務會有一個 Task 參數,該參數就是最開始的任務。
而在使用 Task.ContinueWith()
時,他還能夠傳遞一個標識位。這個標識位用於代表這個延續性任務是在第一個任務什麼狀況下才會執行,通常有三種:OnlyOnCanceled(第一個任務取消時才被執行)、OnlyOnFaulted(第一個任務拋出未處理異常時執行)、OnlyOnRanToCompletion(第一個任務順利完成時執行)。
一個任務在其內部能夠建立其子任務,只須要在內部構造 Task 對象的時候,傳遞一個標識位 TaskCreationOptions.AttachedToParent
將其與父任務關聯。這樣的話,除非其全部子任務執行完成,父任務不會被認爲已經完成。
延續性任務也能夠做爲第一個任務的子任務,指定對應的標識位便可。
任務主要由如下幾部分構成:
能夠看到構造一個 Task 仍是須要比較大的開銷的,若是你不須要 Task 的附加特性,徹底可使用 TaskPool.QueueUserworkItem
來得到更好的性能與效率。
經過 Task 的只讀屬性 Task.Status
,咱們能夠知道任務目前處於哪一種狀態,其最終的狀態主要有 3 種,分別是:RanToCompletion(已完成)、Canceled(被取消)、Faulted(出現異常失敗),這三種狀態都屬於任務完成狀態。
另外值得注意的是,經過 ContinueWith()
、ContinueWhenAll()
、ContinueWhenAny()
等方法建立的任務狀態都爲 WaitingForActivation
,這個狀態表明任務會自動開始。
若是你須要在執行多個相同配置的 Task 對象,能夠經過 TaskFactory
和 TaskFactory<TResult>
,其大概含義與 Task
的含義相同。
在建立工廠時,能夠傳遞一些經常使用的配置標識位和 CancellationToken 對象,以後咱們能夠經過 StartNew()
方法來統一執行一堆任務。
任務調度器通常有兩種,第一種是線程池任務調度器,通常用於服務端程序。還有一種是同步上下文任務調度器,通常用於 GUI 程序。
For 與 Foreach 基本用於操做一個集合,而後循環處理其值。而若是在某個方法內部須要執行多個方法,則能夠經過 Invoke 來進行執行。使用 Parallel 類可讓 CPU 最大化地利用起來而不會阻塞主線程。
不過通常不會將全部 For 與 Foreach 都替換爲並行化的查詢,這是由於某些循環會修改共享數據,這個時候使用 Parallel 的操做則會破壞數據,雖然能夠經過增長線程同步鎖來解決,不過這樣會形成單線程訪問,沒法享受並行處理的好處。
同時 Parallel 方法自己也會有開銷,固然在大量重複性任務中這種開銷能夠忽略不計,可是若是僅爲幾十個短耗時的計算限制任務啓用 Parallel 就會得不償失。
這三種操做都接受一個ParallelOptions
對象用於配置最大並行的工做項數目與調度器。
Parallel 的 For 與 Foreach 的一個重載方法容許傳入 3 個委託,他們分別是:
從上述邏輯來看,能夠看做局部初始化委託爲一個父任務,後面兩個爲子級連續任務的構造。
實例:
static void Main(string[] args) { var files = new List<string>(); files.AddRange(Directory.GetFiles(@"E:\Program Files","*.*",SearchOption.AllDirectories)); files.AddRange(Directory.GetFiles(@"E:\Program Files (x86)","*.*",SearchOption.AllDirectories)); files.AddRange(Directory.GetFiles(@"E:\Project","*.*",SearchOption.AllDirectories)); files.AddRange(Directory.GetFiles(@"E:\Cache","*.*",SearchOption.AllDirectories)); files.AddRange(Directory.GetFiles(@"E:\Windows Kits","*.*",SearchOption.AllDirectories)); files.AddRange(Directory.GetFiles(@"C:\Program Files\dotnet","*.*",SearchOption.AllDirectories)); Console.WriteLine($"總文件數量:{files.Count}"); long allFileCount = 0; var watch = new Stopwatch(); watch.Start(); Parallel.ForEach<string, long>(files, localInit: () => { // 初始化文件大小爲 0, // 這裏的參數類型取決於任務返回的參數 return 0; }, body: (fileName, parallelStatus, index, fileCount) => { // 計算文件大小並返回 long count = 0; try { var info = new FileInfo(fileName); count = info.Length; } catch (Exception e) { } // 這裏由於該任務會被線程池複用,因此要進行累加 return count + fileCount; }, localFinally: fileCount => { Interlocked.Add(ref allFileCount, fileCount); } ); watch.Stop(); Console.WriteLine($"並行效率:{watch.ElapsedMilliseconds} ms"); Console.WriteLine($"文件總大小:{allFileCount / 1024 / 1024 / 1024} Gb"); allFileCount = 0; watch.Reset(); watch.Start(); foreach (var file in files) { long count = 0; try { var info = new FileInfo(file); count = info.Length; } catch (Exception e) { } allFileCount+=count; } watch.Stop(); Console.WriteLine($"單線程效率:{watch.ElapsedMilliseconds} ms"); Console.WriteLine($"文件總大小:{allFileCount / 1024 / 1024 / 1024} Gb"); Console.ReadLine(); }
性能提高:
經過 Parallel 的 Foreach 與普通的 foreach 遍歷計算,性能整體提高了約 56%,越耗時的操做提高的效果就越明顯。
在 Body 的主體委託當中,傳入了一個 ParallelLoopState
對象,該對象用於每一個線程與其餘任務進行交互。主要有兩個方法 Stop()
與 Break()
,前者用於中止循環,後者用於跳出循環,而且跳出循環以後,其 LowestBreakIteration
會返回調用過 Break()
方法的最低項。
而且 Parallel 還會返回一個 ParallelLoopResult
對象,該經過該對象咱們能夠得知這些循環是否正常完成。
LINQ 默認查詢的方式是一個線程順序處理數據集合中的全部項,稱之爲順序查詢。而 PLINQ 就是將這些操做分散到各個 CPU 並行執行,經過 AsParallel()
擴展方法能夠將 IEnumerable<TSource>
轉換爲 ParallelQuery<TSource>
。
而從並行操做切換回順序操做,只須要調用 ParallelEnumable
的 AsSequential()
方法便可。
通過 PLINQ 處理後的數據項其結果是無序的,若是須要有序結果,能夠調用 AsOrdered()
方法。可是該方法比較損耗性能,通常不推薦使用,若是須要排序應該直接使用與 LINQ 同名的 PLINQ 擴展方法。
PLINQ 通常會本身分析使用最好的查詢方式進行查詢,有時候使用順序查詢性能更好。
WithCancellation()
:容許取消某個 PLINQ 查詢。WithExecutionMode()
:容許配置 PLINQ 執行模式,是否強制並行查詢。WithMergeOptions()
:容許配置結果的緩衝與合併方式。WithDegreeOfParallelism()
:容許配置查詢的最大並行數。PS:
不建議在多線程環鏡中使用
Console.Write()
進行輸出,由於 Console 類內部會對線程進行同步,確保只有一個線程能夠訪問控制檯窗口,這樣會損害性能。
經過 CLR 提供的 Timer
定時器,咱們能夠傳入一個回調方法。這樣的話計時器會能夠根據傳入的週期,來定時將咱們的回調方法經過線程池線程進行調用。
同時計時器還容許傳入一個 dueTime
參數來指定這個計時器首次調用回調方法時須要等待多久(當即執行能夠傳入 0),而 period
能夠指定 Timer
調用回調方法的週期。
【原理】
在線程池內部全部的
Timer
對象只使用了一個線程,當某個Timer
到期的時候,這個線程就會被喚醒。該線程經過ThreadPool.QueueUserWorkItem()
方法將一個工做項添加到線程池隊列,這樣你的回調方法就會獲得執行。
【注意】
若是回調方法執行的時常超過了你設置的週期時常,這樣會形成多個線程都在執行你的回調。由於
Timer
不知道你的回調執行完成沒有,他只會到期執行你的回調方法。解決措施是構造一個
Timer
的時候,爲period
指定一個Timeout.Infinite
常量,這樣計時器只會觸發一次。以後在你的回調方法執行完成以後,在其內部經過Timer.Change()
方法指定一個執行週期,而且設置其 dueTime 爲當即執行。這樣作了以後,你的
Timer
就會確保你的回調被執行完成以後再開始下一個週期。這一點能夠參考 Abp 實現的
AbpTimer
對象。
CLR 容許開發人員設置線程池最大工做者線程數,可是通常不要輕易設置該值,但你能夠經過 ThreadPool.GetMaxThreads()
、ThreadPool.GetMinThreads()
、GetAvailableThreads()
方法來獲取一些相關信息。
經過 ThreadPool.QueueUserWorkItem()
方法和 Timer
類處理的工做項老是存儲到 CLR 線程池的 全局隊列 中。工做者線程採用一個 FIFO 算法將工做項從 全局隊列 取出,由於全部工做者線程都有可能去這個隊列拿去工做項,這個時候會使用 線程同步鎖 以確保工做項只會被工做者線程處理一次。這可能會形成性能瓶頸,對伸縮性和性能會形成某些限制。
默認的任務調度器中,非工做者線程調度 Task 時都是存放在全局隊列,而工做者線程調度 Task 則是存放在他本身的本地隊列。
工做者線程處理 Task 的步驟:
PS:
結合上下文,說明工做項首先被添加到了全局隊列,而後由工做者線程取到本身的本地隊列進行處理。
線程池會動態地根據工做項的多少動態地調整工做者線程的數量,通常不須要開發人員進行管控。