Nutshell.ThreadWorkerPool .Net線程池設計

功能描述:html

  1. 支持建立多個線程池,並統一管理
  2. 支持不一樣線程池的容量控制,以及最少活動線程的設置
  3. 支持不一樣線程池中活動線程的閒時設置,即線程空閒時間到期後即自動被回收

結構設計:git

  • ThreadWorkerPoolManager: 線程池管理器,用於統一建立,獲取,銷燬線程池,使用單例模式
  • ThreadWorkerPool: 線程池,用於管理指定數量的線程,由ThreadWorkerPoolManager管理,自身沒法建立與銷燬
  • TheadWorkerPoolItem: 線程池項,用於包裝線程工做器,協助ThreadWorkerPool更好的管理線程,例如取出,放回,閒時的控制
  • TheadWorker: 線程工做器,用於包裝系統線程System.Threading.Thread,使其能夠重複使用,減小Thrad建立和銷燬的性能開銷

  結構關係圖:github

  

 詳細設計:shell

  ThreadWoker多線程

  要點設計:併發

  1. 完成一次任務後,System.Threading.Thread不能被系統銷燬, 默認狀況下new Thread(ThreadStart start).Start(), 當ThreadStart委託的任務完成後,系統將銷燬該線程,也就是說建立一個System.Threading.Thread實例只能使用一次;爲了使線程能被重複使用,ThreadWoker將使用 while+sleeping 的方式對系統線程進行包裝,同時使用AutoResetEvent代替Thread.Sleep(timeout)來達到更佳的控制
  2. 閒時設計,線程資源是極其寶貴的系統資源,若是線程池中存在大量的空閒線程這是一種浪費,極端狀況下將影響系統的穩定性和工做效率;ThreadWorker將使用AutoResetEvent和事件通知的方式來代替在線程池中按期輪詢檢查的方式,每完成一個任務將從新開始空閒時間的計算,若是ThreadWorker在線程池中被取出,那麼ThreadWorker空閒時間將永遠不會到期,直到ThreadWorker被返回線程池後才從新開始空閒時間的計算

  狀態圖:高併發

  

  關鍵代碼:性能

  

 1         private void ThreadWorking()
 2         {
 3             while (_status != ThreadWorkerStatus.Abort)
 4             {
 5                 //WaitOne 返回false表示等待超時,true接到取消等待的通知
 6                 //這裏利用AutoResetEvent.WaitOne的特性來設計閒時控制,false表示空閒到期,true表示新的任務開始
 7                 if (!_waitEvent.WaitOne(_idleTime)) 
 8                 {
 9                     if (!_isCanIdleExpired) //_isCanIdleExpired變量控制是否容許超時,例如被取出後將不能超時
10                         continue;
11 
12                     _status = ThreadWorkerStatus.Abort;
13                     _waitEvent.Close();
14                     _waitEvent.Dispose();
15                     if (OnIdleExpired != null)
16                         OnIdleExpired(this, null); //空閒到期事件通知
17                     return;
18                 }
19                 else if (_status == ThreadWorkerStatus.Abort)
20                     return;
21 
22                 try
23                 {
24                     Working();
25                 }
26                 catch (Exception ex)
27                 {
28                     _logger.TraceEvent(TraceEventType.Error, (int)TraceEventType.Error, ex.ToString());
29                 }
30                 finally
31                 {
32                     _status = ThreadWorkerStatus.Idle;
33                     if (OnWorkCompleted != null)
34                         OnWorkCompleted(this, null); //任務完成事件通知
35                 }
36             }
37         }
 1      public void Work()
 2         {
 3             if (_status == ThreadWorkerStatus.Abort)
 4                 throw new InvalidOperationException("this ThreadWorker was Abort!");
 5 
 6             if (_status == ThreadWorkerStatus.Working)
 7                 throw new InvalidOperationException("this ThreadWorker was working, unable to duplicate work!");
 8 
 9             _status = ThreadWorkerStatus.Working;
10             _waitEvent.Set(); //通知線程有個新的工做要開始
11         }

 

  ThreadWorkerPoolItemthis

  要點設計:spa

  1. 連接ThreadWorker和線程池,線程池經過ThreadWorkerPoolItem控制ThreadWorker在線程池的取出,放回,銷燬
  2. 經過訂閱ThreadWorker的空閒到期事件OnIdleExpired,來完成線程池對線程的移除
  3. 經過訂閱ThreadWorker的任務完成事件OnWorkCompleted,來完成線程返回線程池的操做
  4. 提供剩餘空閒時間查詢,來爲線程池提供更優線程取出方案

  完整代碼:

 1     public sealed class ThreadWorkerPoolItem
 2     {
 3         private ThreadWorkerPoolItemStatus _status;
 4         private readonly ThreadWorkerBase _threadWorker;
 5         private readonly ThreadWorkerPoolBase _threadWorkerPool;
 6         private readonly int _idleTime;
 7         private DateTime _startIdleTime;
 8 
 9         internal ThreadWorkerPoolItem(ThreadWorkerPoolBase pool, ThreadWorkerBase threadWorker, ThreadWorkerPoolSettings poolSettings)
10         {
11             _threadWorkerPool = pool;
12             _threadWorker = threadWorker;
13             _threadWorker.OnIdleExpired += _threadWorker_OnIdleExpired;
14             _threadWorker.OnWorkCompleted += _threadWorker_OnWorkCompleted;
15             _threadWorker.Start();
16             _status = ThreadWorkerPoolItemStatus.Idle;
17             _idleTime = poolSettings.IdleTime;
18         }
19 
20         void _threadWorker_OnWorkCompleted(object sender, EventArgs args)
21         {
22             _threadWorkerPool.Return(this);
23         }
24 
25         void _threadWorker_OnIdleExpired(object sender, EventArgs args)
26         {
27             _threadWorkerPool.Remove(this);
28         }
29 
30         internal ThreadWorkerPoolItemStatus Status
31         {
32             get
33             {
34                 if (_threadWorker.Status == ThreadWorkerStatus.Abort || _status == ThreadWorkerPoolItemStatus.Abort)
35                     return ThreadWorkerPoolItemStatus.Abort;
36 
37                 return _status;
38             }
39         }
40 
41         internal int SurplusIdleTime
42         {
43             get
44             {
45                 if (_status == ThreadWorkerPoolItemStatus.Take || _idleTime == -1)
46                     return -1;
47 
48                 int idledTime = (int)(_startIdleTime - DateTime.Now).TotalMilliseconds;
49                 if (idledTime >= _idleTime)
50                     return 0;
51 
52                 return idledTime;
53             }
54         }
55 
56         internal void SetTake()
57         {
58             _threadWorker.IsCanIdleExpried = false;
59             _status = ThreadWorkerPoolItemStatus.Take;
60         }
61 
62         internal void SetIdle()
63         {
64             _startIdleTime = DateTime.Now;
65             _status = ThreadWorkerPoolItemStatus.Idle;
66             _threadWorker.IsCanIdleExpried = true;
67         }
68 
69         internal ThreadWorkerBase ThreadWorker
70         {
71             get { return _threadWorker; }
72         }
73     }

 

  ThreadWorkerPool

  要點設計:

  1. 使用Lock配合ThreadWorkerPoolItem的狀態來確保多線程下,每次取出的都是空閒的ThreadWorker
  2. 取出的超時設計,因爲線程池有容量控制,高併發下必然致使線程池滿負荷,提供超時設置,有利於使用者自行控制滿負荷狀況下的處理;ThreadWorkerPool將使用while+sleeping的方式,同時使用AutoResetEvent代替Thread.Sleep(timeout)來達到更佳的控制,當一個線程被放回線程池時,另外一等待獲取者當即獲取,而無需等待下一次輪詢的到來

  關鍵代碼:

 1         protected bool TryTake(int timeout, out ThreadWorkerBase threadWorker)
 2         {
 3             threadWorker = null;
 4             lock (_takeLocker)
 5             {
 6                 ThreadWorkerPoolItem worker = null;
 7                 DateTime startWaitTime;
 8                 while (!_isDestoryed)
 9                 {
10                     worker = _threadWorkerList.Where(e => e.Status == Core.ThreadWorkerPoolItemStatus.Idle).OrderByDescending(e => e.SurplusIdleTime).FirstOrDefault();
11                     if (worker == null)
12                     {
13                         if (_threadWorkerList.Count < _settings.MaxThreadWorkerCount)
14                         {
15                             worker = this.CreatePoolItem(_threadWorkerList.Count + 1, _settings.IdleTime);
16                             worker.SetTake();
17                             _threadWorkerList.Add(worker);
18                             threadWorker = worker.ThreadWorker;
19                             return true;
20                         }
21 
22                         startWaitTime = DateTime.Now;
23                         if (!_takeWaitEvent.WaitOne(timeout))
24                         {
25                             threadWorker = null;
26                             return false;
27                         }
28 
29                         if (timeout != -1)
30                         {
31                             timeout = timeout - (int)(DateTime.Now - startWaitTime).TotalMilliseconds;
32                             if (timeout <= 0)
33                             {
34                                 threadWorker = null;
35                                 return false;
36                             }
37                         }
38                         continue;
39                     }
40 
41                     threadWorker = worker.ThreadWorker;
42                     worker.SetTake();
43                     return true;
44                 }
45 
46                 threadWorker = null;
47                 return false;
48             }
49         }
1         internal void Return(ThreadWorkerPoolItem item)
2         {
3             item.SetIdle();
4             _takeWaitEvent.Set();
5         }

  ThreadWorkerPoolManager使用單例模式管理,代碼過於簡單這裏就不貼了......

  有興趣的同窗能夠點擊這裏進行下載源碼查看:Nutshell.ThreadWorkerPool.zip

  github 開源地址: https://github.com/zcylife/Nutshell

相關文章
相關標籤/搜索