今天記錄一下異步多線程的進階歷史,以及簡單的使用方法數據庫
主要仍是以Task,Parallel爲主,畢竟用的比較多的如今就是這些了,再往前去的,除非是老項目,否則真的應該是挺少了,大概有個概念,就當瞭解一下進化史了編程
1:委託異步多線程,全部的異步都是基於委託來實現的安全
#region 委託異步多線程 { //委託異步多線程 Stopwatch watch = new Stopwatch(); watch.Start(); Console.WriteLine($"開始執行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{Thread.CurrentThread.ManagedThreadId}"); Action<string> act = DoSomethingLong; for (int i = 0; i < 5; i++) { //int ThreadId = Thread.CurrentThread.ManagedThreadId;//獲取當前線程ID string name = $"Async {i}"; act.BeginInvoke(name, null, null); } watch.Stop(); Console.WriteLine($"結束執行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{watch.ElapsedMilliseconds}"); } #endregion
2:多線程的最老版本:Thread(好像是2.0的時候出的?記不得了)多線程
//Thread //Thread默認屬於前臺線程,啓動後必須完成 //Thread有不少Api,但大多數都已經不用了 Console.WriteLine("Thread多線程開始了"); Stopwatch watch = new Stopwatch(); watch.Start(); //線程容器 List<Thread> list = new List<Thread>(); for (int i = 0; i < 5; i++) { //int ThreadId = Thread.CurrentThread.ManagedThreadId;//獲取當前線程ID string name = $"Async {i}"; ThreadStart start1 = () => { DoSomethingLong(name); }; Thread thread = new Thread(start1); thread.IsBackground = true;//設置爲後臺線程,關閉後當即退出 thread.Start(); list.Add(thread); //thread.Suspend();//暫停,已經不用了 //thread.Resume();//恢復,已經不用了 //thread.Abort();//銷燬線程 //中止線程靠的不是外部力量,而是線程自身,外部修改信號量,線程檢測信號量 } //判斷當前線程狀態,來作線程等待 while (list.Count(t => t.ThreadState != System.Threading.ThreadState.Stopped) > 0) { Console.WriteLine("等待中.....");
Thread.Sleep(500); } //統計正確所有耗時,經過join來作線程等待 foreach (var item in list) { item.Join();//線程等待,表示把thread線程任務join到當前線程,也就是當前線程等着thread任務完成 //等待完成後統計時間 } watch.Stop();
Thread的無返回值回調:併發
封裝一個方法異步
/// <summary> /// 回調封裝,無返回值 /// </summary> /// <param name="start"></param> /// <param name="callback"></param> private static void ThreadWithCallback(ThreadStart start, Action callback) { ThreadStart nweStart = () => { start.Invoke(); callback.Invoke(); }; Thread thread = new Thread(nweStart); thread.Start(); }
//回調的委託 Action act = () => { Console.WriteLine("回調函數"); }; //要執行的異步操做 ThreadStart start = () => { Console.WriteLine("正常執行。。"); }; ThreadWithCallback(start, act);
有返回值的回調:函數
/// <summary> /// 回調封裝,有返回值 /// 想要獲取返回值,必需要有一個等待的過程 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="func"></param> /// <returns></returns> private static Func<T> ThreadWithReturn<T>(Func<T> func) { T t = default(T); //ThreadStart自己也是一個委託 ThreadStart start = () => { t = func.Invoke(); }; //開啓一個子線程 Thread thread = new Thread(start); thread.Start(); //返回一個委託,由於委託自己是不執行的,因此這裏返回出去的是尚未執行的委託 //等在獲取結果的地方,調用該委託 return () => { //只是判斷狀態的方法 while (thread.ThreadState != System.Threading.ThreadState.Stopped) { Thread.Sleep(500); } //使用線程等待 //thread.Join(); //以上兩種均可以 return t; }; }
Func<int> func = () => { Console.WriteLine("正在執行。。。"); Thread.Sleep(10000); Console.WriteLine("執行結束。。。"); return DateTime.Now.Year; }; Func<int> newfunc = ThreadWithReturn(func); //這裏爲了方便測試,只管感覺回調的執行原理 Console.WriteLine("Else....."); Thread.Sleep(100); Console.WriteLine("Else....."); Thread.Sleep(100); Console.WriteLine("Else....."); //執行回調函數裏return的委託獲取結果 //newfunc.Invoke(); Console.WriteLine($"有參數回調函數的回調結果:{newfunc.Invoke()}");
關於有返回值的回調,由於委託是在執行Invoke的時候纔會去調用委託的方法,因此在調用newfunc.Invoke()的時候,纔會去委託裏面抓取值,這裏會有一個等待的過程,等待線程執行完成測試
3:ThreadPool:線程池spa
線程池是在Thread後出的,算是一種很給力的進化pwa
在Thread的年代,線程是直接從計算機裏抓取的,而線程池的出現,就是給開發人員提供一個容器,能夠從容器中抓取線程,也就是線程池
好處就在於能夠避免頻繁的建立和銷燬線程,直接從線程池拿線程,用完在還回去,當不夠的時候,線程池在從計算機中給你分配,豈不是很爽?
線程池的數量是能夠控制的,最小線程數量:8
ThreadPool.QueueUserWorkItem(p => { //這裏的p是沒有值的 Console.WriteLine(p); Thread.Sleep(2000); Console.WriteLine($"線程池線程,{Thread.CurrentThread.ManagedThreadId}"); }); ThreadPool.QueueUserWorkItem(p => { //這裏的p就是傳進來的值 Console.WriteLine(p); Thread.Sleep(2000); Console.WriteLine($"線程池線程,帶參數,{Thread.CurrentThread.ManagedThreadId}"); }, "這裏是參數");
線程池用起來仍是很方便的,也能夠控制線程數量
線程池裏有兩種線程:普通線程,IO線程,我比較喜歡在操做IO的時候使用ThreadPool
int workerThreads = 0; int completionPortThreads = 0; //設置線程池的最大線程數量(普通線程,IO線程) ThreadPool.SetMaxThreads(80, 80); //設置線程池的最小線程數量(普通線程,IO線程) ThreadPool.SetMinThreads(8, 8); ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads); Console.WriteLine($"當前可用最大普通線程:{workerThreads},IO:{completionPortThreads}"); ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads); Console.WriteLine($"當前可用最小普通線程:{workerThreads},IO:{completionPortThreads}");
ThreadPool並無Thread的Join等待接口,那麼想讓ThreadPool等待要這麼作呢?
ManualResetEvent:通知一個或多個正在等待的線程已發生的事件,至關於發送了一個信號
mre.WaitOne:卡住當前主線程,一直等到信號修改成true的時候,纔會接着往下跑
//用來控制線程等待,false默認爲關閉狀態 ManualResetEvent mre = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(p => { DoSomethingLong("控制線程等待"); Console.WriteLine($"線程池線程,帶參數,{Thread.CurrentThread.ManagedThreadId}"); //通知線程,修改信號爲true mre.Set(); }); //阻止當前線程,直到等到信號爲true在繼續執行 mre.WaitOne(); //關閉線程,至關於設置成false mre.Reset(); Console.WriteLine("信號被關閉了"); ThreadPool.QueueUserWorkItem(p => { Console.WriteLine("再次等待"); mre.Set(); }); mre.WaitOne();
4:Task,這也是如今用的最多的了,畢竟是比較新的玩意
關於Task就介紹幾個最經常使用的接口,基本上就夠用了(一招鮮吃遍天)
Task.Factory.StartNew:建立一個新的線程,Task的線程也是從線程池中拿的(ThreadPool)
Task.WaitAny:等待一羣線程中的其中一個完成,這裏是卡主線程,一直等到一羣線程中的最快的一個完成,才能繼續往下執行(20年前我也差點被後面的給追上),打個簡單的比方:從三個地方獲取配置信息(數據庫,config,IO),同時開啓三個線程來訪問,誰快我用誰。
Task.WaitAll:等待全部線程完成,這裏也是卡主線程,一直等待全部子線程完成任務,才能繼續往下執行。
Task.ContinueWhenAny:回調形式的,任意一個線程完成後執行的後續動做,這個就跟WaitAny差很少,只不是是回調形式的。
Task.ContinueWhenAll:回調形式的,全部線程完成後執行的後續動做,理解同上
//線程容器
List<Task> taskList = new List<Task>(); Stopwatch watch = new Stopwatch(); watch.Start(); Console.WriteLine("************Task Begin**************"); //啓動5個線程 for (int i = 0; i < 5; i++) { string name = $"Task:{i}"; Task task = Task.Factory.StartNew(() => { DoSomethingLong(name); }); taskList.Add(task); } //回調形式的,任意一個完成後執行的後續動做 Task any = Task.Factory.ContinueWhenAny(taskList.ToArray(), task => { Console.WriteLine("ContinueWhenAny"); }); //回調形式的,所有任務完成後執行的後續動做 Task all = Task.Factory.ContinueWhenAll(taskList.ToArray(), tasks => { Console.WriteLine($"ContinueWhenAll{tasks.Length}"); }); //把兩個回調也放到容器裏面,包含回調一塊兒等待 taskList.Add(any); taskList.Add(all); //WaitAny:線程等待,當前線程等待其中一個線程完成 Task.WaitAny(taskList.ToArray()); Console.WriteLine("WaitAny"); //WaitAll:線程等待,當前線程等待全部線程的完成 Task.WaitAll(taskList.ToArray()); Console.WriteLine("WaitAll"); Console.WriteLine($"************Task End**************{watch.ElapsedMilliseconds},{Thread.CurrentThread.ManagedThreadId}");
關於Task其實只要熟練掌握這幾個接口,基本上就夠了,徹底夠用
5:Parallel(並行編程):其實就是在Task基礎上又進行了一次封裝,固然,Parallel也有很酷炫功能
問:首先是爲何叫作並行編程,跟Task有什麼區別?
答:使用Task開啓子線程的時候,主線程是屬於空閒狀態,並不參與執行(我是領導,有5件事情須要處理,我安排了5個手下去作,而我自己就是觀望狀態 PS:究竟是領導。),Parallel開啓子線程的時候,主線程也會參與計算(我是領導,我有5件事情須要處理,我身爲領導,可是我很勤勞,因此我作了一件事情,另外四件事情安排4個手下去完成),很明顯,減小了開銷。
你覺得Parallel就只有這個炫酷的功能?大錯特錯,更炫酷的還有;
Parallel能夠控制線程的最大併發數量,啥意思?就是無論你是腦溢血,仍是心臟病,仍是動脈大出血,個人手術室只有2個,同時也只能給兩我的作手術,作完一個在進來一個,同時作完兩個,就同時在進來兩個,多了不行。
當前,你想使用Task,或者ThreadPool來實現這樣的效果也是能夠的,不過這就須要你動動腦筋了,固然,有微軟給封裝好的接口直接使用,豈不美哉?
//並行編程 Console.WriteLine($"*************Parallel start********{Thread.CurrentThread.ManagedThreadId}****"); //一次性執行1個或多個線程,效果相似:Task WaitAll,只不過Parallel的主線程也參與了計算 Parallel.Invoke( () => { DoSomethingLong("Parallel`1"); }, () => { DoSomethingLong("Parallel`2"); }, () => { DoSomethingLong("Parallel`3"); }, () => { DoSomethingLong("Parallel`4"); }, () => { DoSomethingLong("Parallel`5"); }); //定義要執行的線程數量 Parallel.For(0, 5, t => { int a = t; DoSomethingLong($"Parallel`{a}"); }); { ParallelOptions options = new ParallelOptions() { MaxDegreeOfParallelism = 3//執行線程的最大併發數量,執行完成一個,就接着開啓一個 }; //遍歷集合,根據集合數量執行線程數量 Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, t => { DoSomethingLong($"Parallel`{t}"); }); } { ParallelOptions options = new ParallelOptions() { MaxDegreeOfParallelism = 3//執行線程的最大併發數量,執行完成一個,就接着開啓一個 }; //遍歷集合,根據集合數量執行線程數量 Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, (t, status) => { //status.Break();//這一次結束。 //status.Stop();//整個Parallel結束掉,Break和Stop不能夠共存 DoSomethingLong($"Parallel`{t}"); }); } Console.WriteLine("*************Parallel end************");
能夠多瞭解一下Parallel的接口,裏面的用法有不少,我這裏也只是列出了比較經常使用的幾個,像ParallelOptions類能夠控制併發數量,固然,不能夠也能夠,Parallel的重載方法不少,能夠本身看看
6:線程取消,異常處理
關於線程取消這塊呢,要記住一點,線程只能自身終結自身,也就是除非我自殺,不然你幹不掉我,不要妄想經過主線程來控制計算中的子線程。
關於線程異常處理這塊呢,想要捕獲子線程的異常,最好在子線程自己抓捕,也可使用Task的WaitAll,不過這種方法不推薦,若是用了,別忘了一點,catch裏面放的是AggregateException,不是Exception,否則捕捉不到別怪我
TaskFactory taskFactory = new TaskFactory(); //通知線程是否取消 CancellationTokenSource cts = new CancellationTokenSource(); List<Task> taskList = new List<Task>(); //想要主線程抓捕到子線程異常,必須使用Task.WaitAll,這種方法不推薦 //想要捕獲子線程的異常,最好在子線程自己抓捕 //徹底搞不懂啊,看懵逼了都 try { for (int i = 0; i < 40; i++) { int name = i; //在子線程中抓捕異常 Action<object> a = t => { try { Thread.Sleep(2000); Console.WriteLine(cts.IsCancellationRequested); if (cts.IsCancellationRequested) { Console.WriteLine($"放棄執行{name}"); } else { if (name == 1) { throw new Exception($"取消了線程{name}{t}"); } if (name == 5) { throw new Exception($"取消了線程{name}{t}"); } Console.WriteLine($"執行成功:{name}"); } } catch (Exception ex) { //通知線程取消,後面的都不執行 cts.Cancel(); Console.WriteLine(ex.Message); } }; taskList.Add(taskFactory.StartNew(a, name, cts.Token)); } Task.WaitAll(taskList.ToArray()); } catch (AggregateException ex) { foreach (var item in ex.InnerExceptions) { Console.WriteLine(item.Message); } }
7:給線程上個鎖,防止併發的時候數據丟失,覆蓋等
//先準備三個公共變量 private static int iIndex; private static object obj = new object(); private static List<int> indexList = new List<int>();
List<Task> taskList = new List<Task>(); //開啓1000個線程 for (int i = 0; i < 10000; i++) { int newI = i; Task task = Task.Factory.StartNew(() => { iIndex += 1; indexList.Add(newI); }); taskList.Add(task); } //等待全部線程完成 Task.WhenAll(taskList.ToArray()); //打印結果 Console.WriteLine(iIndex); Console.WriteLine(indexList.Count);
給大家看一下我這裏運行三次打印出的結果
第一次:
第二次:
第三次:
看的出來,仍是比較穩定的只丟失那麼幾個數據的對吧?
爲啥會這樣呢?由於當兩個線程同時操做一個數據的時候,你以爲會以誰的操做爲標準來保存?就好像咱們操做IO的時候,不容許多多個線程同時操做一個IO,由於計算機不知道以誰的標準來保存修改
下面給它加個鎖,稍微修改一下代碼:
List<Task> taskList = new List<Task>(); //開啓1000個線程 for (int i = 0; i < 10000; i++) { int newI = i; Task task = Task.Factory.StartNew(() => { //加個鎖 lock (objLock) { iIndex += 1; indexList.Add(newI); } }); taskList.Add(task); } //等待全部線程完成 Task.WhenAll(taskList.ToArray()); //打印結果 Console.WriteLine(iIndex); Console.WriteLine(indexList.Count);
在看一下運行結果:
線程鎖會破壞線程,增長耗時,下降效率,不要看效果很爽就處處加鎖,萬一你鑰匙丟了呢(死鎖);
共有變量:都能訪問的局部變量/全局變量/數據庫的值/硬盤文件,這些都有多是數據不安全的,若是有需求,仍是得加個鎖
若是確實是要用到鎖,推薦你們就使用一個:私有的,靜態的,object類型的變量就能夠了;
漏掉了一個方法,我給補上:
/// <summary> /// 一個比較耗時的方法,循環1000W次 /// </summary> /// <param name="name"></param> public static void DoSomethingLong(string name) { int iResult = 0; for (int i = 0; i < 1000000000; i++) { iResult += i; } Console.WriteLine($"********************{name}*******{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}****{Thread.CurrentThread.ManagedThreadId}****"); }