.NET異步多線程,Thread,ThreadPool,Task,Parallel,異常處理,線程取消

今天記錄一下異步多線程的進階歷史,以及簡單的使用方法數據庫

主要仍是以Task,Parallel爲主,畢竟用的比較多的如今就是這些了,再往前去的,除非是老項目,否則真的應該是挺少了,大概有個概念,就當瞭解一下進化史了編程

1:委託異步多線程,全部的異步都是基於委託來實現的安全

#region 委託異步多線程
{
  //委託異步多線程
  Stopwatch watch = new Stopwatch();
  watch.Start();
  Console.WriteLine($"開始執行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{Thread.CurrentThread.ManagedThreadId}");
  Action<string> act = DoSomethingLong;
  for (int i = 0; i < 5; i++)
  {
    //int ThreadId = Thread.CurrentThread.ManagedThreadId;//獲取當前線程ID
    string name = $"Async {i}";
    act.BeginInvoke(name, null, null);
  }
  watch.Stop();
  Console.WriteLine($"結束執行了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")},,,,{watch.ElapsedMilliseconds}");
}
#endregion

 

2:多線程的最老版本:Thread(好像是2.0的時候出的?記不得了)多線程

//Thread
//Thread默認屬於前臺線程,啓動後必須完成
//Thread有不少Api,但大多數都已經不用了
Console.WriteLine("Thread多線程開始了");
Stopwatch watch = new Stopwatch();
watch.Start();
//線程容器
List<Thread> list = new List<Thread>();
for (int i = 0; i < 5; i++)
{
       //int ThreadId = Thread.CurrentThread.ManagedThreadId;//獲取當前線程ID
       string name = $"Async {i}";
       ThreadStart start1 = () =>
       {
          DoSomethingLong(name);
       };
       Thread thread = new Thread(start1);
       thread.IsBackground = true;//設置爲後臺線程,關閉後當即退出
       thread.Start();
       list.Add(thread);
       //thread.Suspend();//暫停,已經不用了
       //thread.Resume();//恢復,已經不用了
       //thread.Abort();//銷燬線程
       //中止線程靠的不是外部力量,而是線程自身,外部修改信號量,線程檢測信號量
}
//判斷當前線程狀態,來作線程等待
while (list.Count(t => t.ThreadState != System.Threading.ThreadState.Stopped) > 0)
{
  Console.WriteLine("等待中.....");
  Thread.Sleep(
500); } //統計正確所有耗時,經過join來作線程等待 foreach (var item in list) {   item.Join();//線程等待,表示把thread線程任務join到當前線程,也就是當前線程等着thread任務完成   //等待完成後統計時間 } watch.Stop();

Thread的無返回值回調:併發

封裝一個方法異步

/// <summary>
/// 回調封裝,無返回值
/// </summary>
/// <param name="start"></param>
/// <param name="callback"></param>
private static void ThreadWithCallback(ThreadStart start, Action callback)
{
  ThreadStart nweStart = () =>
  {
    start.Invoke();
    callback.Invoke();
  };
  Thread thread = new Thread(nweStart);
  thread.Start();
}
//回調的委託
Action act = () =>
{
    Console.WriteLine("回調函數");
};
//要執行的異步操做
ThreadStart start = () =>
{
    Console.WriteLine("正常執行。。");
};
ThreadWithCallback(start, act);

有返回值的回調:函數

/// <summary>
/// 回調封裝,有返回值
/// 想要獲取返回值,必需要有一個等待的過程
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <returns></returns>
private static Func<T> ThreadWithReturn<T>(Func<T> func)
{
  T t = default(T);
  //ThreadStart自己也是一個委託
  ThreadStart start = () =>
    {
      t = func.Invoke();
    };
  //開啓一個子線程
  Thread thread = new Thread(start);
  thread.Start();
  //返回一個委託,由於委託自己是不執行的,因此這裏返回出去的是尚未執行的委託
  //等在獲取結果的地方,調用該委託
  return () =>
  {
    //只是判斷狀態的方法
    while (thread.ThreadState != System.Threading.ThreadState.Stopped)
    {
      Thread.Sleep(500);
    }
    //使用線程等待
    //thread.Join();
    //以上兩種均可以
    return t;
  };
}
Func<int> func = () =>
{
  Console.WriteLine("正在執行。。。");
  Thread.Sleep(10000);
  Console.WriteLine("執行結束。。。");
  return DateTime.Now.Year;
};
Func<int> newfunc = ThreadWithReturn(func);
//這裏爲了方便測試,只管感覺回調的執行原理
Console.WriteLine("Else.....");
Thread.Sleep(100);
Console.WriteLine("Else.....");
Thread.Sleep(100);
Console.WriteLine("Else.....");
//執行回調函數裏return的委託獲取結果
//newfunc.Invoke();
Console.WriteLine($"有參數回調函數的回調結果:{newfunc.Invoke()}");

關於有返回值的回調,由於委託是在執行Invoke的時候纔會去調用委託的方法,因此在調用newfunc.Invoke()的時候,纔會去委託裏面抓取值,這裏會有一個等待的過程,等待線程執行完成測試

 

3:ThreadPool:線程池spa

線程池是在Thread後出的,算是一種很給力的進化pwa

在Thread的年代,線程是直接從計算機裏抓取的,而線程池的出現,就是給開發人員提供一個容器,能夠從容器中抓取線程,也就是線程池

好處就在於能夠避免頻繁的建立和銷燬線程,直接從線程池拿線程,用完在還回去,當不夠的時候,線程池在從計算機中給你分配,豈不是很爽?

線程池的數量是能夠控制的,最小線程數量:8

ThreadPool.QueueUserWorkItem(p =>
{
  //這裏的p是沒有值的
  Console.WriteLine(p);
  Thread.Sleep(2000);
  Console.WriteLine($"線程池線程,{Thread.CurrentThread.ManagedThreadId}");
});
ThreadPool.QueueUserWorkItem(p =>
{
  //這裏的p就是傳進來的值
  Console.WriteLine(p);
  Thread.Sleep(2000);
  Console.WriteLine($"線程池線程,帶參數,{Thread.CurrentThread.ManagedThreadId}");
}, "這裏是參數");

線程池用起來仍是很方便的,也能夠控制線程數量

線程池裏有兩種線程:普通線程,IO線程,我比較喜歡在操做IO的時候使用ThreadPool

int workerThreads = 0;
int completionPortThreads = 0;
//設置線程池的最大線程數量(普通線程,IO線程)
ThreadPool.SetMaxThreads(80, 80);
//設置線程池的最小線程數量(普通線程,IO線程)
ThreadPool.SetMinThreads(8, 8);
ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
Console.WriteLine($"當前可用最大普通線程:{workerThreads},IO:{completionPortThreads}");
ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
Console.WriteLine($"當前可用最小普通線程:{workerThreads},IO:{completionPortThreads}");

ThreadPool並無Thread的Join等待接口,那麼想讓ThreadPool等待要這麼作呢?

ManualResetEvent:通知一個或多個正在等待的線程已發生的事件,至關於發送了一個信號

mre.WaitOne:卡住當前主線程,一直等到信號修改成true的時候,纔會接着往下跑

//用來控制線程等待,false默認爲關閉狀態
ManualResetEvent mre = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(p =>
{
  DoSomethingLong("控制線程等待");
  Console.WriteLine($"線程池線程,帶參數,{Thread.CurrentThread.ManagedThreadId}");
  //通知線程,修改信號爲true
  mre.Set();
});
//阻止當前線程,直到等到信號爲true在繼續執行
mre.WaitOne();
//關閉線程,至關於設置成false
mre.Reset();
Console.WriteLine("信號被關閉了");
ThreadPool.QueueUserWorkItem(p =>
{
  Console.WriteLine("再次等待");
  mre.Set();
});
mre.WaitOne();

 

4:Task,這也是如今用的最多的了,畢竟是比較新的玩意

關於Task就介紹幾個最經常使用的接口,基本上就夠用了(一招鮮吃遍天)

Task.Factory.StartNew:建立一個新的線程,Task的線程也是從線程池中拿的(ThreadPool)

Task.WaitAny:等待一羣線程中的其中一個完成,這裏是卡主線程,一直等到一羣線程中的最快的一個完成,才能繼續往下執行(20年前我也差點被後面的給追上),打個簡單的比方:從三個地方獲取配置信息(數據庫,config,IO),同時開啓三個線程來訪問,誰快我用誰。

Task.WaitAll:等待全部線程完成,這裏也是卡主線程,一直等待全部子線程完成任務,才能繼續往下執行。

Task.ContinueWhenAny:回調形式的,任意一個線程完成後執行的後續動做,這個就跟WaitAny差很少,只不是是回調形式的。

Task.ContinueWhenAll:回調形式的,全部線程完成後執行的後續動做,理解同上

//線程容器
List<Task> taskList = new List<Task>(); Stopwatch watch = new Stopwatch(); watch.Start(); Console.WriteLine("************Task Begin**************"); //啓動5個線程 for (int i = 0; i < 5; i++) {   string name = $"Task:{i}";   Task task = Task.Factory.StartNew(() =>   {     DoSomethingLong(name);   });   taskList.Add(task); } //回調形式的,任意一個完成後執行的後續動做 Task any = Task.Factory.ContinueWhenAny(taskList.ToArray(), task => {   Console.WriteLine("ContinueWhenAny"); }); //回調形式的,所有任務完成後執行的後續動做 Task all = Task.Factory.ContinueWhenAll(taskList.ToArray(), tasks => {   Console.WriteLine($"ContinueWhenAll{tasks.Length}"); }); //把兩個回調也放到容器裏面,包含回調一塊兒等待 taskList.Add(any); taskList.Add(all); //WaitAny:線程等待,當前線程等待其中一個線程完成 Task.WaitAny(taskList.ToArray()); Console.WriteLine("WaitAny"); //WaitAll:線程等待,當前線程等待全部線程的完成 Task.WaitAll(taskList.ToArray()); Console.WriteLine("WaitAll"); Console.WriteLine($"************Task End**************{watch.ElapsedMilliseconds},{Thread.CurrentThread.ManagedThreadId}");

關於Task其實只要熟練掌握這幾個接口,基本上就夠了,徹底夠用

 

5:Parallel(並行編程):其實就是在Task基礎上又進行了一次封裝,固然,Parallel也有很酷炫功能

問:首先是爲何叫作並行編程,跟Task有什麼區別?

答:使用Task開啓子線程的時候,主線程是屬於空閒狀態,並不參與執行(我是領導,有5件事情須要處理,我安排了5個手下去作,而我自己就是觀望狀態 PS:究竟是領導。),Parallel開啓子線程的時候,主線程也會參與計算(我是領導,我有5件事情須要處理,我身爲領導,可是我很勤勞,因此我作了一件事情,另外四件事情安排4個手下去完成),很明顯,減小了開銷。

你覺得Parallel就只有這個炫酷的功能?大錯特錯,更炫酷的還有;

Parallel能夠控制線程的最大併發數量,啥意思?就是無論你是腦溢血,仍是心臟病,仍是動脈大出血,個人手術室只有2個,同時也只能給兩我的作手術,作完一個在進來一個,同時作完兩個,就同時在進來兩個,多了不行。

當前,你想使用Task,或者ThreadPool來實現這樣的效果也是能夠的,不過這就須要你動動腦筋了,固然,有微軟給封裝好的接口直接使用,豈不美哉?

 //並行編程 
Console.WriteLine($"*************Parallel start********{Thread.CurrentThread.ManagedThreadId}****");
//一次性執行1個或多個線程,效果相似:Task WaitAll,只不過Parallel的主線程也參與了計算
Parallel.Invoke(
  () => { DoSomethingLong("Parallel`1"); },
  () => { DoSomethingLong("Parallel`2"); },
  () => { DoSomethingLong("Parallel`3"); },
  () => { DoSomethingLong("Parallel`4"); },
  () => { DoSomethingLong("Parallel`5"); });
//定義要執行的線程數量
Parallel.For(0, 5, t =>
{
  int a = t;
  DoSomethingLong($"Parallel`{a}");
});
{
  ParallelOptions options = new ParallelOptions()
  {
    MaxDegreeOfParallelism = 3//執行線程的最大併發數量,執行完成一個,就接着開啓一個
  };
  //遍歷集合,根據集合數量執行線程數量
  Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, t =>
  {
    DoSomethingLong($"Parallel`{t}");
  });
}
{
  ParallelOptions options = new ParallelOptions()
  {
    MaxDegreeOfParallelism = 3//執行線程的最大併發數量,執行完成一個,就接着開啓一個
  };
  //遍歷集合,根據集合數量執行線程數量
  Parallel.ForEach(new List<string> { "a", "b", "c", "d", "e", "f", "g" }, options, (t, status) =>
  {
    //status.Break();//這一次結束。
    //status.Stop();//整個Parallel結束掉,Break和Stop不能夠共存
    DoSomethingLong($"Parallel`{t}");
  });
}
Console.WriteLine("*************Parallel end************");

能夠多瞭解一下Parallel的接口,裏面的用法有不少,我這裏也只是列出了比較經常使用的幾個,像ParallelOptions類能夠控制併發數量,固然,不能夠也能夠,Parallel的重載方法不少,能夠本身看看

 

6:線程取消,異常處理

關於線程取消這塊呢,要記住一點,線程只能自身終結自身,也就是除非我自殺,不然你幹不掉我,不要妄想經過主線程來控制計算中的子線程。

關於線程異常處理這塊呢,想要捕獲子線程的異常,最好在子線程自己抓捕,也可使用Task的WaitAll,不過這種方法不推薦,若是用了,別忘了一點,catch裏面放的是AggregateException,不是Exception,否則捕捉不到別怪我

TaskFactory taskFactory = new TaskFactory();
//通知線程是否取消
CancellationTokenSource cts = new CancellationTokenSource();
List<Task> taskList = new List<Task>();
//想要主線程抓捕到子線程異常,必須使用Task.WaitAll,這種方法不推薦
//想要捕獲子線程的異常,最好在子線程自己抓捕
//徹底搞不懂啊,看懵逼了都
try
{
  for (int i = 0; i < 40; i++)
  {
    int name = i;
    //在子線程中抓捕異常
    Action<object> a = t =>
    {
      try
      {
        Thread.Sleep(2000);
        Console.WriteLine(cts.IsCancellationRequested);
        if (cts.IsCancellationRequested)
        {
          Console.WriteLine($"放棄執行{name}");
        }
        else
        {
          if (name == 1)
          {
            throw new Exception($"取消了線程{name}{t}");
          }
          if (name == 5)
          {
            throw new Exception($"取消了線程{name}{t}");
          }
          Console.WriteLine($"執行成功:{name}");
        }
      }
      catch (Exception ex)
      {
        //通知線程取消,後面的都不執行
        cts.Cancel();
        Console.WriteLine(ex.Message);
      }
    };
  taskList.Add(taskFactory.StartNew(a, name, cts.Token));
  }  
  Task.WaitAll(taskList.ToArray());
}
catch (AggregateException ex)
{
  foreach (var item in ex.InnerExceptions)
  {
    Console.WriteLine(item.Message);
  }
}

 

7:給線程上個鎖,防止併發的時候數據丟失,覆蓋等

//先準備三個公共變量
private static int iIndex;
private static object obj = new object();
private static List<int> indexList = new List<int>();
List<Task> taskList = new List<Task>();
//開啓1000個線程
for (int i = 0; i < 10000; i++)
{
  int newI = i;
  Task task = Task.Factory.StartNew(() =>
  {
  iIndex += 1;
  indexList.Add(newI);
  });
  taskList.Add(task);
}
//等待全部線程完成
Task.WhenAll(taskList.ToArray());
//打印結果
Console.WriteLine(iIndex);
Console.WriteLine(indexList.Count);

給大家看一下我這裏運行三次打印出的結果

第一次:

第二次:

第三次:

看的出來,仍是比較穩定的只丟失那麼幾個數據的對吧?

爲啥會這樣呢?由於當兩個線程同時操做一個數據的時候,你以爲會以誰的操做爲標準來保存?就好像咱們操做IO的時候,不容許多多個線程同時操做一個IO,由於計算機不知道以誰的標準來保存修改

下面給它加個鎖,稍微修改一下代碼:

List<Task> taskList = new List<Task>();
//開啓1000個線程
for (int i = 0; i < 10000; i++)
{
    int newI = i;
    Task task = Task.Factory.StartNew(() =>
    {
        //加個鎖
        lock (objLock)
        {
            iIndex += 1;
            indexList.Add(newI);
        }
    });
    taskList.Add(task);
}
//等待全部線程完成
Task.WhenAll(taskList.ToArray());
//打印結果
Console.WriteLine(iIndex);
Console.WriteLine(indexList.Count);

在看一下運行結果:

線程鎖會破壞線程,增長耗時,下降效率,不要看效果很爽就處處加鎖,萬一你鑰匙丟了呢(死鎖);

共有變量:都能訪問的局部變量/全局變量/數據庫的值/硬盤文件,這些都有多是數據不安全的,若是有需求,仍是得加個鎖

若是確實是要用到鎖,推薦你們就使用一個:私有的,靜態的,object類型的變量就能夠了;

 

漏掉了一個方法,我給補上:

/// <summary>
/// 一個比較耗時的方法,循環1000W次
/// </summary>
/// <param name="name"></param>
public static void DoSomethingLong(string name)
{
    int iResult = 0;
    for (int i = 0; i < 1000000000; i++)
    {
        iResult += i;
    }
    Console.WriteLine($"********************{name}*******{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}****{Thread.CurrentThread.ManagedThreadId}****");
}
相關文章
相關標籤/搜索