【C#進階系列】26 計算限制的異步操做

什麼是計算限制的異步操做,當線程在要使用CPU進行計算的時候,那麼就叫計算限制。web

而對應的IO限制就是線程交給IO設備(鍵鼠,網絡,文件等)。數組

第25章線程基礎講了用專用的線程進行計算限制的操做,可是建立專用線程開銷大,並且太多的線程也浪費內存資源,那麼本章就討論一種更好的方法,即線程池技術。安全

CLR線程池網絡

CLR包含了代碼來管理它本身的線程池。線程池是應用程序能使用的線程集合,每一個CLR一個線程池,這個線程池由CLR上全部的AppDomain共享。數據結構

CLR初始化時線程池中沒有線程。多線程

在線程池內部維護着一個操做請求隊列,應用程序執行異步操做時,就調用某方法將一個記錄項追加到線程池隊列中。異步

線程池的代碼從這個隊列中提取記錄項,將這個記錄項派發給一個線程池線程,若是線程池沒有線程,就建立一個新線程。async

當線程池線程完成任務後,線程不會被銷燬。相反,線程會返回線程池,在那裏進入空閒狀態,等待響應另外一個請求。、函數

因爲線程池不銷燬自身,而且再次作異步操做不用建立新的線程,因此再也不產生額外的性能損失。oop

若是應用程序向線程池發送了不少記錄項到線程池隊列,線程池最開始會嘗試只用一個線程服務全部記錄項,然而若是添加記錄項的速度超過了線程池線程處理記錄項的速度,就會建立額外的線程。

若是再也不往線程池中發送請求,池中存在大量什麼都不作的線程。那麼這些閒置的線程池線程會在一段時間後本身醒來終止本身並釋放資源。

寫到這裏就應該很清楚了,若是垃圾回收器是幫咱們自動回收垃圾,那麼線程池技術就是幫咱們自動管理線程。

用線程池技術執行簡單的計算限制操做

很少作解釋,直接上代碼更易懂,可對比上一章的用專用線程來進行計算限制的異步操做的代碼,這樣更易於理解:

    static void Main(string[] args)
        {
            ThreadPool.QueueUserWorkItem(線程回調函數, "hello");
            Console.WriteLine("記錄項進入線程池隊列");
            Console.Read();
        }
        private static void 線程回調函數(Object 狀態參數) {
            Thread.Sleep(10000);
            if (狀態參數.GetType() == typeof(string))
            {
                Console.WriteLine("這是一個字符串");
            }
            else {
                Console.WriteLine("未識別");
            }
        }

執行上下文

每一個線程都關聯一個執行上下文數據結構。

執行上下文包括的東西有

安全設置(壓縮站、Thread的Principal屬性和Windows身份),

宿主設置(參見System.Threading.HostExecutionContextManager)

以及邏輯調用上下文數據(參見Sysyem.Runtime.Remoting.Messaging.CallContext的LogicalSetData和LogicalGetData方法)。

線程執行代碼時,一些操做可能會用到執行上下文結構。

而每當一個線程使用另外一個線程執行任務時,前者的執行上下文會複製給後者的執行上下文。這樣就確保了執行任何操做都使用相同的安全設置和宿主設置。還確保了在線程中邏輯調用上下文中存儲的任何數據都適用於被使用的另外一個線程。

默認狀況下,CLR自動形成初始線程的執行上下文複製到任何輔助線程。

這會形成性能影響,由於收集上下文信息並複製到輔助線程,花費很多時間,若是輔助線程中還有輔助線程,那麼開銷更大。

System.Threading命名空間有一個ExecutionContext類,也就是執行上下文類,它容許你控制線程的執行上下文是否複製到另外一個線程。

經常使用的方法有三個,SuppressFlow(取消複製執行上下文),RestoreFlow(恢復複製執行上下文),IsFlowSuppressed(是否上下文複製被取消).

上代碼,更簡單:

     static void Main(string[] args)
        {
            CallContext.LogicalSetData("操做", "將一個鍵值對放入執行上下文中");
            ThreadPool.QueueUserWorkItem(
                state=>Console.WriteLine("第一次"+CallContext.LogicalGetData("操做"))
            );
            ExecutionContext.SuppressFlow();//取消執行上下文在異步線程間的複製
            ThreadPool.QueueUserWorkItem(
                state => Console.WriteLine("第二次" + CallContext.LogicalGetData("操做"))
            );
            ExecutionContext.RestoreFlow();//恢復執行上下文在異步線程間的複製
            ThreadPool.QueueUserWorkItem(
               state => Console.WriteLine("第三次" + CallContext.LogicalGetData("操做"))
           );
            Console.Read();
        }

代碼運行結果以下:

由於是異步操做,因此執行順序不一樣,可是咱們這裏僅僅關注執行結果就好了,第二次確實沒有將執行上下文複製到另外一個線程中。

另外這裏不只僅是指線程池,專用線程也是同樣的。

協做式取消和超時

.NET提供了標準的協做式取消操做模式,意味着要取消的操做必須顯示支持取消。

也就是說不管執行操做的代碼,仍是取消操做的代碼,都必須使用本節提到的類型。

取消操做首先要建立一個CancellationTokenSource對象。

這個對象包含了和管理取消有關的全部狀態。可今後對象的Token屬性獲取一個或多個CancellationToken實例,並傳給操做,使操做能夠取消。

而CancellationToken是輕量級的值類型,包含單個私有字段即對其CancellationTokenSource對象的引用。

在計算限制操做的循環中,能夠定時調用CancellationToken的IsCancellationRequested屬性,瞭解循環是否應該提早終止,從而終止計算限制的操做。

如下爲演示代碼:

static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            ThreadPool.QueueUserWorkItem(state => Farm(cts.Token, 850));//Farm一個治療指環
            Console.WriteLine("按回車取消Farm");
            Console.ReadLine();
            cts.Cancel();// 取消Farm操做
            Console.Read();
        }

        //Farm指定數量的錢就返回
        private static void Farm(CancellationToken token,int money) {
            var currentMoney = 0;
            while (currentMoney < money) {
                if (token.IsCancellationRequested) {
                    Console.WriteLine("肯定取消Farm");
                    break;
                }
                currentMoney += 50;
                Console.WriteLine("Troy已經Farm了" + currentMoney + "");
                Thread.Sleep(1000);//一秒鐘補一個兵
                
            }
        }

上效果圖:

而若是要Farm操做不容許被取消,能夠傳CancellationToken.None。

可經過調用CancellationToken的Register方法登記一個或多個在取消操做時調用的函數。、

可經過CancellationTokenSource的CreateLinkedTokenSource函數連接其餘CancellationTokenSource對象來建立一個新的對象A,若是任意一個被連接的對象取消,那麼A也會被取消。

可經過傳給CancellationTokenSource的構造器延時變量,表示在指定的一段時間後CancellationTokenSource自動取消。

任務
ThreadPool的QueueUserWorkItem方法發起一次異步的計算限制操做,然而並無機制讓咱們知道何時這個操做完成,也沒有機制在操做完成時獲取返回值。

爲了克服這個限制並解決其它一些問題,微軟引入了任務的概念。(經過System.Threading.Task命名空間中的類型來使用任務)

如下代碼爲線程池玩法和任務玩法的對比

ThreadPool.QueueUserWorkItem(線程回調函數, "hello");//線程池玩法
new Task(線程回調函數, "hello").Start();//任務的玩法1
Task.Run(() => 線程回調函數("hello"));//任務的玩法2

在構造Task對象的時候,還能夠傳遞CancellationToken,用於任務取消,也能夠傳遞TaskCreationOptions標誌來控制Task的執行方式。

接下來就寫段代碼,看看人物是如何等待任務完成並獲取結果的

        static void Main(string[] args)
        {
            Task<Tuple<Boolean, String>> myTask = new Task<Tuple<bool, string>>(賞金任務, 100);
            myTask.Start();
            Thread.Sleep(10000);
            Console.WriteLine("任務進行中");
            myTask.Wait();//顯示等待任務結束
            Console.WriteLine("任務結果爲:" + myTask.Result.Item2);
            Console.ReadLine();
        }

        private static Tuple<Boolean, String> 賞金任務(object state) {
            Console.WriteLine("Troy接手了這個賞金任務,並獲取了{0}金",state.ToString());
            return new Tuple<bool, string>(true, "成功");
        }

Tuple<Boolean, string>爲任務返回的結果類型,給Task的泛型變量就應該和所調用函數的返回值同樣。

結果以下:

從這個結果咱們瞭解到任務確實是異步執行了,而且確實返回了正確的結果給myTask.Result。

當調用Wait()函數時當前線程會阻塞,直到任務結束。(若是沒用start,直接wait,那麼任務也會執行。只不過此時線程不會被阻塞,它會直接執行任務並當即返回)

除了等待單個任務,實際上Task還提供了WaitAny和WaitAll兩個靜態方法來阻塞線程,等待一個Task數組,直到數組中的全部Task完成。

取消任務

前面講到任務也能夠傳CancellationToken,用於取消任務。

在其它的地方同樣,不過在判斷任務是否取消的地方,應該用CancellationToken對象的ThrowIfCancellationRequested()方法而不是用IsCancellationRequested進行判斷。

緣由是不像線程池的QueueUserWorkItem,任務有辦法表示完成,也能夠返回一個值,因此須要採用一種方式將已完成的任務和出錯的任務區分開。

而讓任務拋出異常,就能夠知道任務沒有一直運行到結束。

上代碼:

     static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            Task<Tuple<Boolean, String>> myTask = Task.Run(()=>賞金任務(cts.Token,100), cts.Token );
            Thread.Sleep(5000);
            cts.Cancel();
            try
            {
                Console.WriteLine("任務結果爲:" + myTask.Result.Item2);
            }
            catch (AggregateException ex)
            {
                //將任何OperationCanceledException對象都視爲已處理
                //其餘任何異常都形成拋出一個新的AggregateException
                //其中只包含未處理的異常
                ex.Handle(e => e is OperationCanceledException);//對異常集合的每一個異常都調用處理程序
                Console.WriteLine("取消任務");
            }
            catch {
                Console.WriteLine("未知異常");
            }
            Console.Read();
        }

        private static Tuple<Boolean, String> 賞金任務(CancellationToken ct,object state) {
            for (int i = 0; i < 100; i++)
            {
                ct.ThrowIfCancellationRequested();
                Console.WriteLine("Troy接手了這個賞金任務,並獲取了{0}金", state.ToString());
                Thread.Sleep(1000);
            }
            return new Tuple<Boolean, String>(true, "成功");
        }

上結果:

任務完成時自動啓動新任務

伸縮性好的軟件不該該使用線程阻塞。

調用Wait或者在任務還沒有完成時查詢任務的Result屬性,極有可能形成線程池建立新的線程。

如下方法能夠知道任務在何時結束,且不使用阻塞。

Task<Tuple<Boolean, String>> myTask = Task.Run(()=>賞金任務(cts.Token,100), cts.Token );//建立並啓動一個任務
Task myTask1 = myTask.ContinueWith(task => Console.WriteLine("任務結果爲:" + task.Result.Item2));

且還能夠傳給它TaskContinuationOptions位標誌來控制繼續的任務。默認狀況下,不指定任何TaskContinuationOptions位標誌,那麼不管第一個任務取消仍是失敗,都會繼續執行第二個任務。

任務調用的函數中建立的任務,被稱爲子任務,有一些關於父任務和子任務的處理,用TaskContinuationOptions或以前介紹的TaskCreationOptions來控制。

實際上Task對象內部有一個ContinueWith任務的集合,也就是說一個Task能夠屢次ContinueWith,這個Task在任務完成後會執行全部的ContinueWith任務。

任務的內部揭祕

每一個Task對象內部都有一組字段,這些字段構成了任務的狀態。

任務雖然頗有用,可是它也是有代價的。必須爲全部這些狀態分配內存,若是不須要任務的附加功能(也就是知道什麼時候結束且能夠返回值),那麼使用ThreadPool的QueueUserWorkItem能得到更好的資源利用率。

Task對象的只讀屬性Status返回一個TaskStatus枚舉值,該枚舉值代表了任務正處於一個怎樣的狀態。

當任務建立後,狀態爲Created,啓動後爲WatingToRun,實際在一個線程中運行後爲Running,中止運行並等待任何子任務時爲WaitingForChildrenToComplete。

完成時進入一下狀態:RanToCompletion(完成),Canceled(取消),Faulted(出錯)。

若是任務出錯,可查詢任務的Exception屬性獲取任務拋出的未處理異常。其老是返回一個AggregateException對象,其InnerExceptions集合包含了全部未處理異常。

調用ContinueWith,ContinueWhenAll,ContinueWhenAny或FromAsync等方法建立的Task對象處於WatingForActivation狀態。該狀態表示任務隱式建立,並會自動開始。

任務工廠

有時須要建立一組共享相同配置的Task對象。爲避免機械地將相同的參數傳遞給每一個Task的構造器,可建立一個任務工廠來封裝通用配置。

而TaskFactory類型就是這個目的。

創工廠類時,要向構造器傳遞全部要建立的任務都具備的默認值,也就是CancellationToken,TaskScheduler,TaskCreationOption和TaskContinuationOptions。

來個簡單演示:

var tf = new TaskFactory<Int32>(
                cts.Token,
                TaskCreationOptions.AttachedToParent,
                TaskContinuationOptions.ExecuteSynchronously,
                TaskScheduler.Default
            );
            //用任務工廠建立三個任務
            var childTasks = new[] {
                tf.StartNew(()=> { Console.WriteLine("任務1");return 1; }),
                tf.StartNew(()=> { Console.WriteLine("任務2");return 2; }),
                tf.StartNew(()=> { Console.WriteLine("任務3");return 3; })
            };
            tf.ContinueWhenAll(childTasks, completedTask => completedTask
                 .Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result), CancellationToken.None)
                .ContinueWith(t => Console.WriteLine("最後的任務返回結果爲" + t.Result), TaskContinuationOptions.ExecuteSynchronously);
            Console.Read();

這只是最基礎的用法,取消的時候只須要傳給他一個Token,那麼一旦取消,整個task數組中的任務都會取消。

任務調度器

任務基礎結構很是靈活,其中TaskScheduler對象功不可沒。

此對象負責執行被調度的任務。FCL提供了兩個派生自TaskScheduler的類型:線程池任務調度器和同步上下文調度器。默認狀況下全部應用程序使用的都是線程池任務調度器。

同步上下文任務調度器適合提供了圖形用戶界面的應用程序。它將全部任務都調度給程序的GUI線程,使全部任務代碼都能成功更新UI組件。該調度不使用線程池。

可執行TaskScheduler的靜態FromCurrentSynchronizationContext()方法來獲取對同步上下文任務調度器的引用。

這個玩法貌似我用不到,作web的啊,並且看了例子很簡單,因此這裏就不寫了。

Parallel的靜態For,ForEach和Invoke方法

Parallel就是並行的意思。

主要是用於將一些常見的for或者foreach循環用任務進行多線程化,以提高性能。

System.Threading.Tasks.Parallel類封裝了這些情形,例以下面的代碼:

        for (int i = 0; i < 1000; i++) DoSomething(i);//for循環作某事
            Parallel.For(0, 1000, i => DoSomething(i));//Parallel替代方案,線程池並行處理工做
            foreach (var item in collection) DoSomething(item);//foreach循環作某事
            Parallel.ForEach(collection, l => DoSomething(l));//Parallel替代方案,線程池並行處理工做
            //若是能夠用For而不是ForEach,那麼就用For,由於更快
            //順序執行全部方法
            Method1();
            Method2();
            Method3();
            //Parallel替代方案,順序執行全部方法
            Parallel.Invoke(() => Method1(), () => Method2(), () => Method3());

若是調用線程在線程池執行完任務以前執行完了本身的那部分工做,那麼調用線程會掛起,等待任務完成。

然而調用Parallel的方法時,請注意所作的工做必定要能並行執行,若是必需要順序執行,那麼仍是用原來的for循環比較好。

若是有大量的工做項(也就是循環次數不少),或者是每次循環作的事情涉及大量工做,那麼用Parallel性能會獲得很大提高,反之,性能可能得不償失。

Parallel的方法均可以接收一個ParallelOptions對象,這個對象能夠對Parallel的工做方式作一些配置。

還能夠傳遞一個ParallelloopState對象來控制循環任務的執行。

此對象的Stop方法,讓循環中止,Break讓循環再也不處理後面的工做。

並行語言集成查詢(PLINQ)

LINQ提供了一簡捷的語法來查詢數據集合。然而其只能一個線程順序處理數據集合中的全部項。這就是順序查詢。

而要提升性能,可使用PLINQ,也就是並行LINQ。它將順序查詢轉換爲並行查詢。

靜態System.Linq.ParallelEnumerable類(在System.Core.dll中定義)實現了PLINQ的全部功能,因此必須經過C#的using指令將System.Linq命名空間導入源代碼。

而全部的Where,Select之類方法的並行版本,都是System.Linq.ParallelQuery<T>類型的擴展方法。

下面舉個簡單的例子:

        List<string> nameList = new List<string>() { "Troy", "小二", "小三", "小四" };
            var query = from name in nameList.AsParallel()//啓用查詢的並行化,將其轉換爲ParallelQuery<string> 
                        let myName = "我叫" + name
                        where name == "Troy"
                        select myName;
            Parallel.ForEach(query, l => Console.WriteLine(l));
            //query.ForAll(l => Console.WriteLine(l));//也能夠用這行代碼替代上一句,ParallelQuery有個ForAll方法,爲每一個查詢的結果執行內容
            Console.Read();

以上例子只是爲了演示玩法,並不考慮效率。

經過上面的例子其實能夠發現,PLINQ和LINQ沒有任何區別,只要將集合調用AsParallel()便可。

而若是要講並行查詢再轉換爲並行查詢,那麼能夠用AsSequential()。

上面的這個例子用順序查詢實際上快得多。並且Console內部會對線程同步,確保每次只有一個線程來訪問控制檯窗口,因此這裏用並行操做實際上還會損壞性能。

用PLINQ由於是並行處理數據,因此返回的都是無序結構,若是要保持順序,那麼應該調用AsOrdered方法,調用後會成組處理數據,而後組合並後保持順序,能夠想象這也會損耗性能。

並且如下操做符也會聲稱不排序的操做:Distinct,Except,Intersect,Union,Join,GroupBy,GroupJoin和ToLookup.若是這些操做後還要排序,那麼又要調用AsOrdered方法。

同事PLINQ提供了一些額外的方法:

WithCancellation(容許取消),

WithDegreeOfParallelism(指定最多的線程數),

WithExecutionMode(傳遞ParallelExecutionMode標誌),

WithMergeOptions(PLINQ讓多個線程處理數據後會合併,因此可傳參ParallelMergeOptions位標誌,控制結果的緩衝和合並方式。有緩衝傾向於加快速度,無緩衝傾向於節約內存)。

執行定時計算限制操做

System.Threading命名空間有一個Timer類,可執行定時操做。

在內部,線程池爲全部Timer對象都只使用一個線程,此線程知道下一個Timer對象在何時到期。到期後,線程會被喚醒,在內部調用線程池的QueueUserWorkItem,將工做項加入線程池隊列。

這個點就很少講了,很常見。(垃圾回收那一章講過若是Timer對象在代碼上看起來沒被使用會致使被回收,因此要有變量保持Timer對象存活)

若是要定時執行某操做,可使用Task的靜態Delay方法和C#的async和await關鍵字。(下一章會講到,這裏只給一個簡單例子)

        static void Main(string[] args)
        {
            asyncDoSomething();
            Console.Read();
        }

        private static async void asyncDoSomething() {
            while (true) {
                Console.WriteLine("time is {0}", DateTime.Now);
                //不阻塞線程的前提下延遲兩秒
                await Task.Delay(2000);//await容許線程返回
                //2秒後某個線程會在await後介入並繼續循環
            }
        }

線程池如何管理線程

CLR容許開發人員設置線程池要建立的最大線程數。(然而若是設定了這個值,那麼就可能發生飢餓和死鎖)。

默認的最大線程數目前爲1000個左右。

可經過Threadpool類的一些靜態方法如GetMaxThreads,SetMinThreads來限制線程池的線程數,然而做者並不建議這麼作。

 

Threadpool.QueueUserWorkItem方法和Timer類老是將工做項放在一個線程池全局隊列中的(用的先入先出模式),因此多個工做者線程可能同時從這個隊列中取工做項。爲了保證多個工做者線程不會取到一個工做項,因此實際上全部工做者線程都競爭同一個線程同步鎖。

而對於任務,非工做者線程調用一個任務時(用非默認的TaskScheduler任務調度器),任務被放進全局隊列。

而工做者線程調度Task時,都有本身的本地隊列。工做者線程準備處理工做項時,先檢查本地隊列,因爲工做者線程是惟一容許訪問自身的本地隊列,因此這裏不須要線程同步鎖。(因此在本地隊列刪除和增長Task的速度很快,本地隊列的處理用的是後入先出模式)

若是某個工做者線程的本地隊列空了,那麼它會從其它隊列找工做項去執行,並要求獲取一個線程同步鎖。

若是全部工做者線程的本地隊列都空了,那麼這個時候才檢查全局隊列。

若是全局隊列也空了,那麼工做者線程會進入睡眠,等待事情發生。

若是睡眠事件太長,會自動喚醒,並銷燬自身。

相關文章
相關標籤/搜索