因爲項目需求,須要開發一些程序去爬取一些網站的信息,算是小爬蟲程序吧。爬網頁這東西是要通過網絡傳輸,若是程序運行起來串行執行請求爬取,會很慢,我想沒人會這樣作。爲了提升爬取效率,必須使用多線程並行爬取。這時候就須要線程池了。池的概念,我想作開發的都應該知道,目的就是對資源的合理運用。剛開始的時候,我首先想到的就是 .net 框架下的線程池 ThreadPool,畢竟是自帶的,在性能、穩定性方面確定沒問題。但在琢磨了幾天後,.net 框架下自帶的這個 ThreadPool 讓我很不放心。1.ThreadPool 是一個靜態類!!也就是說,當程序運行起來之後,這個池是整個應用程序域共享的,.net 框架很大,程序運行了之後,除了我們本身往這個共享池裏塞任務,誰知道有沒有其餘的類、代碼、任務也會往裏塞 workItem 呢?也就是說,假如我設置這個共享池大小爲 10,但實際爲咱們工做的線程會不到 10 個,這就會致使程序運行時達不到咱們預期的效果。2.目前咱們的爬蟲程序設計是像一個服務同樣掛着,只要程序啓動了之後就會一直運行着,除非手動中止。所以,在沒有爬取任務的時候,須要減小甚至清空池內的全部線程,以避免池內線程一直掛着佔用系統資源。因爲 .net 自帶的這個線程池是共享的,我還真不敢隨意調整它的大小,對於我這種控制慾極強的程序員來講,這是萬萬接受不了的。雖然.net 自帶的 ThreadPool 用法簡單,功能強大,並且它還能夠智能的調節池內線程池數量,但我仍是決定拋棄它,由於,我須要一個可控的線程池!因而開始到網上處處查找有沒有其它現成的線程池。百度、谷歌了很久,發如今.net界比較成熟的就 SmartThreadPool,對 SmartThreadPool 簡單瞭解之後,仍是以爲它不是我想要的,因而決定,自造一個。因而,藉助強大的網絡,又開始網上處處搜索有關線程池如何實現以及相關注意事項之類的信息,也拜讀過一些網上開源的項目,如 .net 的 SmartThreadPool、java 裏的 ThreadPoolExecutor 等,雖然沒接觸過 java,但 java 和 C# 猶如親兄弟,大同小異,讓我這 .net coder 讀起來不是很費勁。基於前人的實現思路,再融入本身的思想,腦中本身的池也慢慢浮現…java
根據需求,首先定義基本接口程序員
public interface IThreadPool : IDisposable { /// <summary> /// 線程池大小 /// </summary> int Threads { get; set; } /// <summary> /// 一個以毫秒爲單位的值,表示從最後一個活動的線程執行完任務後開始計時,在指定的時間內線程池都沒有接收到任何任務,則釋放掉池內的全部線程。若設置值小於 0,則不會釋放池內線程。如未指定,默認爲 -1。 /// </summary> double KeepAliveTime { get; set; } /// <summary> /// 獲取當前線程池內的空閒線程數量 /// </summary> /// <returns></returns> int GetAvailableThreads(); /// <summary> /// 獲取當前線程池內工做項總數 /// </summary> /// <returns></returns> int GetWorkCount(); /// <summary> /// 向線程池中添加工做項 /// </summary> /// <param name="callback"></param> /// <param name="state"></param> /// <returns></returns> bool QueueWorkItem(WaitCallback callback, object state); }
能夠看到,我定義的線程池接口成員就幾個(命名都是來自 .net 自帶 ThreadPool,哈哈),它們的用途上面代碼也都帶有註釋。接下來,咱們看下核心實現。首先,介紹類 WorkerThread:服務器
internal class WorkerThread : IDisposable { Thread _thread; AutoResetEvent _waitEvent; Action _action; bool _disposed = false; public WorkerThread() { this._waitEvent = new AutoResetEvent(false); this._thread = new Thread(this.Run); this._thread.IsBackground = true; this._thread.Start(); } //是否正在執行工做 public bool IsWorking { get; private set; } public event Action<WorkerThread> Complete; public int ThreadId { get { return this._thread.ManagedThreadId; } } public ThreadState ThreadState { get { return this._thread.ThreadState; } } public void SetWork(Action act) { this.CheckDisposed(); if (this.IsWorking) throw new Exception("正在執行工做項"); this._action = act; } public void Activate() { this.CheckDisposed(); if (this.IsWorking) throw new Exception("正在執行工做項"); if (this._action == null) throw new Exception("未設置任何工做項"); this._waitEvent.Set(); } void Run() { while (!this._disposed) { this._waitEvent.WaitOne(); if (this._disposed) break; try { this.IsWorking = true; this._action(); } catch (ThreadAbortException) { if (!this._disposed) Thread.ResetAbort(); } catch (Exception) { } finally { this.IsWorking = false; this._action = null; var completeEvent = this.Complete; if (completeEvent != null) { try { completeEvent(this); } catch { } } } } } void CheckDisposed() { if (this._disposed) throw new ObjectDisposedException(this.GetType().Name); } public void Dispose() { if (this._disposed) return; try { this._disposed = true; //注意:只有調用 Dispose() 的不是當前對象維護的線程才調用 Abort if (Thread.CurrentThread.ManagedThreadId != this._thread.ManagedThreadId) { this._thread.Abort(); } } finally { this._waitEvent.Dispose(); } } }
這個類主要是對線程的封裝,我稱之爲 WorkerThread。它主要負責接收線程池分配的一個任務,而後執行,線程池內維護的就是這麼一個類對象。介紹下它的幾個核心成員:網絡
WorkerThread 這個類代碼也不是不少,百來行而已。總結來講它做用就是 SetWork(Action act) –> Activate() –> _action() –> WaitOne() –> SetWork(Action act) –> Activate()…無限循環…但願你們看得明白。多線程
瞭解 WorkerThread 了之後,再來看下真正實現 IThreadPool 接口的類 WorkerThreadPool。前面提到,線程池的做用就是池內維護必定數量可重複利用的線程,WorkerThreadPool 負責 WorkerThread 的建立、接收用戶任務並將任務分配給池內空閒的線程去執行用戶任務,功能其實就這麼簡單。廢話很少說,直接上代碼:框架
public class WorkerThreadPool : IThreadPool, IDisposable { readonly object _lockObject = new object(); bool _disposed = false; bool _spin = false; //spin 每調用一次 QueueWorkItem 方法都會將 spin 設爲 false,以通知計時線程中止循環了 double _keepAliveTime = -1;//等待時間,表示從最後一個活動的線程執行完任務後開始計時到必定的時間內都沒有接受到任何任務,則釋放掉池內的全部線程 DateTime _releaseTime = DateTime.Now;//釋放線程的時間點 int _threads; List<WorkerThread> _allThreads; List<WorkerThread> _workingTreads; Queue<WorkerThread> _freeTreads; Queue<WorkItem> _workQueue; List<ReleaseThreadsRecord> _releaseThreadsRecords = new List<ReleaseThreadsRecord>(); /// <summary> /// 建立一個線程池,默認 Threads 爲 25 /// </summary> public WorkerThreadPool() : this(25) { } /// <summary> /// 建立一個大小爲 threads 的線程池 /// </summary> /// <param name="threads"></param> public WorkerThreadPool(int threads) { if (threads < 1) throw new ArgumentException("threads 小於 1"); this._threads = threads; this._allThreads = new List<WorkerThread>(threads); this._workingTreads = new List<WorkerThread>(threads); this._freeTreads = new Queue<WorkerThread>(threads); this._workQueue = new Queue<WorkItem>(); } /// <summary> /// /// </summary> ~WorkerThreadPool() { this.Dispose(); } /// <summary> /// 線程池大小 /// </summary> public int Threads { get { return this.GetPoolSize(); } set { this.SetPoolSize(value); } } /// <summary> /// 一個以毫秒爲單位的值,表示從最後一個活動的線程執行完任務後開始計時,在指定的時間內線程池都沒有接受到任何任務,則釋放掉池內的全部線程。若設置值小於 0,則不會釋放池內線程。如未指定,默認爲 -1。 /// </summary> public double KeepAliveTime { get { return this._keepAliveTime; } set { this._keepAliveTime = value; } } int GetPoolSize() { return this._threads; } void SetPoolSize(int threads) { if (threads < 1) throw new ArgumentException("threads 小於 1"); lock (this._lockObject) { this._threads = threads; this.AdjustPool(); WorkerThread workerThread = null; WorkItem workItem = null; while (this.TryGetWorkerThreadAndWorkItem(out workerThread, out workItem, false)) { this.ActivateWorkerThread(workerThread, workItem); workerThread = null; workItem = null; } } } /// <summary> /// 獲取當前線程池內的空閒線程數量 /// </summary> /// <returns></returns> public int GetAvailableThreads() { lock (this._lockObject) { if (this._threads <= this._workingTreads.Count) return 0; int r = this._threads - this._workingTreads.Count; return r; } } /// <summary> /// 獲取當前線程池內工做項總數 /// </summary> /// <returns></returns> public int GetWorkCount() { lock (this._lockObject) { return this._workQueue.Count + this._workingTreads.Count; } } /// <summary> /// 向線程池中添加工做項 /// </summary> /// <param name="callback"></param> /// <param name="state"></param> /// <returns></returns> public bool QueueWorkItem(WaitCallback callback, object state) { if (callback == null) return false; WorkerThread workerThread = null; WorkItem workItem = null; lock (this._lockObject) { CheckDisposed(); if (this._workQueue.Count == int.MaxValue) return false; this._workQueue.Enqueue(new WorkItem(callback, state)); this._spin = false; if (!this.TryGetWorkerThreadAndWorkItem(out workerThread, out workItem, true)) { return true; } } this.ActivateWorkerThread(workerThread, workItem); return true; } public List<ReleaseThreadsRecord> GetReleaseThreadsRecords() { lock (this._lockObject) { List<ReleaseThreadsRecord> list = new List<ReleaseThreadsRecord>(this._releaseThreadsRecords.Count); foreach (var releaseThreadsRecord in this._releaseThreadsRecords) { list.Add(releaseThreadsRecord); } return list; } } /// <summary> /// 該方法必須在 locked 下執行 /// </summary> /// <param name="workerThread"></param> /// <param name="workItem"></param> /// <param name="workerThreadCall">是不是當前池內的線程調用該方法</param> /// <returns></returns> bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall) { workerThread = null; workItem = null; if (this._workQueue.Count > 0) { if (this._freeTreads.Count > 0) { workerThread = this._freeTreads.Dequeue(); workItem = this._workQueue.Dequeue(); this._workingTreads.Add(workerThread); return true; } else { if (this._allThreads.Count < this._threads) { workerThread = new WorkerThread(); workItem = this._workQueue.Dequeue(); this._allThreads.Add(workerThread); this._workingTreads.Add(workerThread); return true; } return false; } } else { if (!workerThreadCall) return false; double t = this._keepAliveTime; if (t < 0) { this._workQueue.TrimExcess(); return false; } //此代碼塊只有當前池內的線程完成工做了之後訪問到,從 QueueWorkItem 方法調用該方法是不會執行此代碼塊的,由於 this.workQueue.Count > 0 if (this._freeTreads.Count == this._allThreads.Count && this._workingTreads.Count == 0 && this._freeTreads.Count > 0) { /* *能執行到這,說明池內沒有了任何任務,而且是最後一個活動線程執行完畢 *此時從池中取出一個線程來執行 Tick 方法 */ DateTime now = DateTime.Now; int threadId = Thread.CurrentThread.ManagedThreadId; if (this._allThreads.Any(a => a.ThreadId == threadId))//既然只有當前池內的線程能訪問到這,這句判斷是否是有點多餘了- - { workerThread = this._freeTreads.Dequeue();//彈出一個 WorkerThread 對象,此時不需將彈出的 WorkerThread 對象放入 workingTreads 隊列中,由於該對象是供池內自身計時用,相對外界是不知道的,保證外界調用 GetAvailableThreads 方法能獲得一個合理的結果 workItem = new WorkItem((state) => { this.Tick((WorkerThread)state); }, workerThread); this._spin = true; try { this._releaseTime = now.AddMilliseconds(t);//設置待釋放線程的時間點 } catch (ArgumentOutOfRangeException) { this._releaseTime = DateTime.MaxValue; } return true; } } return false; } } void ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem) { workerThread.SetWork(workItem.Execute); workerThread.Complete += this.WorkComplete; workerThread.Activate(); } void WorkComplete(WorkerThread workerThread) { //避免沒法調用終結器,務必將 this.WorkComplete 從 workerThread.Complete 中移除,取出 workerThread 的時候再加上 workerThread.Complete -= this.WorkComplete; if (this._disposed) return; WorkerThread nextWorkerThread = null; WorkItem nextWorkItem = null; lock (this._lockObject) { if (this._disposed) return; this._workingTreads.Remove(workerThread); this._freeTreads.Enqueue(workerThread); this.AdjustPool(); if (!this.TryGetWorkerThreadAndWorkItem(out nextWorkerThread, out nextWorkItem, true)) { return; } } this.ActivateWorkerThread(nextWorkerThread, nextWorkItem); } /// <summary> /// 該方法必須在 locked 下執行 /// </summary> void AdjustPool() { while (this._allThreads.Count > this._threads && this._freeTreads.Count > 0) { WorkerThread workerThread = this._freeTreads.Dequeue(); this._allThreads.Remove(workerThread); workerThread.Dispose(); } } /// <summary> /// 自旋計時,直到收到中止自旋的消息或者達到了釋放池內的線程時刻 /// </summary> /// <param name="workerThread"></param> void Tick(WorkerThread workerThread) { DateTime releaseTimeTemp = this._releaseTime; while (true) { lock (this._lockObject) { if (!this._spin) { //spin==false,即說明收到了中止計時的通知 break; } if (DateTime.Now >= releaseTimeTemp) { //此時中止計時,開始釋放線程,在釋放線程前先把以前加上的 WorkComplete 給取消,這樣就不會執行 WorkComplete 方法了,循環終止後 當前線程也天然執行完畢 workerThread.Complete -= this.WorkComplete; //釋放全部的線程,也包括當前線程 this.ReleaseThreads(); this._spin = false; //對於運行到這裏,由於全部的線程已經被釋放(包括當前線程),因此運行完這個方法,當前線程也天然運行結束了 break; } } Thread.SpinWait(1); } } void ReleaseThreads() { ReleaseThreadsRecord releaseThreadsRecord = new ReleaseThreadsRecord(DateTime.Now); foreach (WorkerThread thread in this._allThreads) { try { thread.Dispose(); } catch { } releaseThreadsRecord.ThreadIds.Add(thread.ThreadId); } releaseThreadsRecord.ThreadIds.TrimExcess(); this._releaseThreadsRecords.Add(releaseThreadsRecord); this._allThreads.Clear(); this._freeTreads.Clear(); this._workingTreads.Clear(); this._workQueue.Clear(); this._workQueue.TrimExcess(); } void CheckDisposed() { if (this._disposed) throw new ObjectDisposedException(this.GetType().Name); } /// <summary> /// 銷燬當前對象資源,調用此方法會強制終止池內的全部線程 /// </summary> public void Dispose() { if (this._disposed) return; //釋放全部線程 lock (this._lockObject) { if (this._disposed) return; this.ReleaseThreads(); this._threads = 0; this._disposed = true; } this.Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// /// </summary> /// <param name="disposing"></param> protected virtual void Dispose(bool disposing) { } class WorkItem { bool _executed = false; WaitCallback _callback; object _state; public WorkItem(WaitCallback callback, object state) { if (callback == null) throw new ArgumentNullException("callback"); this._callback = callback; this._state = state; } public void Execute() { if (this._executed) throw new Exception("當前 WorkItem 已經執行"); this._executed = true; this._callback(this._state); } } public class ReleaseThreadsRecord { public ReleaseThreadsRecord() { this.ThreadIds = new List<int>(); } public ReleaseThreadsRecord(DateTime releaseTime) { this.ReleaseTime = releaseTime; this.ThreadIds = new List<int>(); } /// <summary> /// 釋放時間 /// </summary> public DateTime ReleaseTime { get; set; } /// <summary> /// 被釋放線程的 Id 集合 /// </summary> public List<int> ThreadIds { get; set; } public ReleaseThreadsRecord Clone() { ReleaseThreadsRecord record = new ReleaseThreadsRecord(this.ReleaseTime); record.ThreadIds = this.ThreadIds.ToList(); return record; } } }
這個類代碼仍是挺多的(貌似有點佔篇幅- - ),雖然代碼裏都加上了註釋,但我仍是想給你們簡單說說其實現思路以及內部一些核心相關成員,方便你們更快的理解。ide
上面就是 WorkerThreadPool 的一些核心字段以及方法,至於其它的成員就不作詳細說明了。 爲了方便管理,池內用了 _freeTreads 和 _workingTreads 兩個集合來維護池內線程狀態。因此每次從空閒線程 _freeTreads 取出 workerThread 執行任務的時候,都必須將 workerThread 添加到 _workingTreads 集合中;每一個 workerThread 執行完任務都會將本身從 _workingTreads 移除,同時將本身置爲空閒線程添加到 _freeTreads 集合中等待接受下一個任務來臨,因此 WorkComplete 方法體內最後都要調用 TryGetWorkerThreadAndWorkItem 方法獲取可用的 WorkerThread 以及一個待處理的任務,而後執行,這樣就造成了一個循環,只要有任務,池內就會一直處於滿負荷狀態。性能
開篇提到一個需求:沒有爬取任務的時候,須要減小甚至清空池內的全部線程,以避免池內線程一直掛着佔用系統資源。所以我給 IThreadPool 加了一個屬性:KeepAliveTime。經過這個屬性,能夠給線程池設定一個時間,即線程池在指定的時間內都沒有接收到任何任務,則會自行將池內的線程給銷燬。在 WorkerThreadPool 中這個功能的實現很簡單,在最後一個任務被執行完了之後,會自動從池內取出一個空閒的 workerThread 執行計時操做,也就是 WorkerThreadPool.Tick 方法,其實現也就是自旋計時,若是過了指定時間後都沒有接受到任務,則自動將池內的線程給銷燬。這個計時實現很簡陋- - ,技術有限,想不到其它好辦法了。學習
咱們的這個線程池設計簡單,功能不是很強,但很適合咱們如今的程序,至少讓我用的安心。目前已經在服務器上跑了一年半,一切都很正常。小程進入園子已有3年,在這麼好的平臺上小程一直都只知道汲取,卻從未想過回報。所以,我想給你們分享點東西,雖然這個 WorkerThreadPool 簡單,沒什麼高深的技術,但也算是個小結晶。若是你們有好建議,小程將萬分感謝!網站
第一次寫博,好緊張!最後,感謝博客園提供了這麼一個良好的學習平臺!