分享一個自制的 .net線程池

扯淡

因爲項目需求,須要開發一些程序去爬取一些網站的信息,算是小爬蟲程序吧。爬網頁這東西是要通過網絡傳輸,若是程序運行起來串行執行請求爬取,會很慢,我想沒人會這樣作。爲了提升爬取效率,必須使用多線程並行爬取。這時候就須要線程池了。池的概念,我想作開發的都應該知道,目的就是對資源的合理運用。剛開始的時候,我首先想到的就是 .net 框架下的線程池 ThreadPool,畢竟是自帶的,在性能、穩定性方面確定沒問題。但在琢磨了幾天後,.net 框架下自帶的這個 ThreadPool 讓我很不放心。1.ThreadPool 是一個靜態類!!也就是說,當程序運行起來之後,這個池是整個應用程序域共享的,.net 框架很大,程序運行了之後,除了我們本身往這個共享池裏塞任務,誰知道有沒有其餘的類、代碼、任務也會往裏塞 workItem 呢?也就是說,假如我設置這個共享池大小爲 10,但實際爲咱們工做的線程會不到 10 個,這就會致使程序運行時達不到咱們預期的效果。2.目前咱們的爬蟲程序設計是像一個服務同樣掛着,只要程序啓動了之後就會一直運行着,除非手動中止。所以,在沒有爬取任務的時候,須要減小甚至清空池內的全部線程,以避免池內線程一直掛着佔用系統資源。因爲 .net 自帶的這個線程池是共享的,我還真不敢隨意調整它的大小,對於我這種控制慾極強的程序員來講,這是萬萬接受不了的。雖然.net 自帶的 ThreadPool 用法簡單,功能強大,並且它還能夠智能的調節池內線程池數量,但我仍是決定拋棄它,由於,我須要一個可控的線程池!因而開始到網上處處查找有沒有其它現成的線程池。百度、谷歌了很久,發如今.net界比較成熟的就 SmartThreadPool,對 SmartThreadPool  簡單瞭解之後,仍是以爲它不是我想要的,因而決定,自造一個。因而,藉助強大的網絡,又開始網上處處搜索有關線程池如何實現以及相關注意事項之類的信息,也拜讀過一些網上開源的項目,如 .net 的 SmartThreadPool、java 裏的 ThreadPoolExecutor 等,雖然沒接觸過 java,但 java 和 C# 猶如親兄弟,大同小異,讓我這 .net coder 讀起來不是很費勁。基於前人的實現思路,再融入本身的思想,腦中本身的池也慢慢浮現…java

線程池:IThreadPool

根據需求,首先定義基本接口程序員

public interface IThreadPool : IDisposable
    {
        /// <summary>
        /// 線程池大小
        /// </summary>
        int Threads { get; set; }
        /// <summary>
        /// 一個以毫秒爲單位的值,表示從最後一個活動的線程執行完任務後開始計時,在指定的時間內線程池都沒有接收到任何任務,則釋放掉池內的全部線程。若設置值小於 0,則不會釋放池內線程。如未指定,默認爲 -1。
        /// </summary>
        double KeepAliveTime { get; set; }
        /// <summary>
        /// 獲取當前線程池內的空閒線程數量
        /// </summary>
        /// <returns></returns>
        int GetAvailableThreads();
        /// <summary>
        /// 獲取當前線程池內工做項總數
        /// </summary>
        /// <returns></returns>
        int GetWorkCount();
        /// <summary>
        /// 向線程池中添加工做項
        /// </summary>
        /// <param name="callback"></param>
        /// <param name="state"></param>
        /// <returns></returns>
        bool QueueWorkItem(WaitCallback callback, object state);
    }
View Code

能夠看到,我定義的線程池接口成員就幾個(命名都是來自 .net 自帶 ThreadPool,哈哈),它們的用途上面代碼也都帶有註釋。接下來,咱們看下核心實現。首先,介紹類 WorkerThread:服務器

internal class WorkerThread : IDisposable
{
    Thread _thread;
    AutoResetEvent _waitEvent;
    Action _action;
    bool _disposed = false;

    public WorkerThread()
    {
        this._waitEvent = new AutoResetEvent(false);
        this._thread = new Thread(this.Run);
        this._thread.IsBackground = true;
        this._thread.Start();
    }

    //是否正在執行工做
    public bool IsWorking
    {
        get;
        private set;
    }

    public event Action<WorkerThread> Complete;
    public int ThreadId
    {
        get
        {
            return this._thread.ManagedThreadId;
        }
    }
    public ThreadState ThreadState
    {
        get
        {
            return this._thread.ThreadState;
        }
    }

    public void SetWork(Action act)
    {
        this.CheckDisposed();

        if (this.IsWorking)
            throw new Exception("正在執行工做項");

        this._action = act;
    }

    public void Activate()
    {
        this.CheckDisposed();
        if (this.IsWorking)
            throw new Exception("正在執行工做項");
        if (this._action == null)
            throw new Exception("未設置任何工做項");

        this._waitEvent.Set();
    }

    void Run()
    {
        while (!this._disposed)
        {
            this._waitEvent.WaitOne();
            if (this._disposed)
                break;

            try
            {
                this.IsWorking = true;
                this._action();
            }
            catch (ThreadAbortException)
            {
                if (!this._disposed)
                    Thread.ResetAbort();
            }
            catch (Exception)
            {
            }
            finally
            {
                this.IsWorking = false;
                this._action = null;
                var completeEvent = this.Complete;
                if (completeEvent != null)
                {
                    try
                    {
                        completeEvent(this);
                    }
                    catch
                    {
                    }
                }
            }
        }
    }

    void CheckDisposed()
    {
        if (this._disposed)
            throw new ObjectDisposedException(this.GetType().Name);
    }

    public void Dispose()
    {
        if (this._disposed)
            return;

        try
        {
            this._disposed = true;
            //注意:只有調用 Dispose() 的不是當前對象維護的線程才調用 Abort
            if (Thread.CurrentThread.ManagedThreadId != this._thread.ManagedThreadId)
            {
                this._thread.Abort();
            }
        }
        finally
        {
            this._waitEvent.Dispose();
        }
    }
}
View Code

這個類主要是對線程的封裝,我稱之爲 WorkerThread。它主要負責接收線程池分配的一個任務,而後執行,線程池內維護的就是這麼一個類對象。介紹下它的幾個核心成員:網絡

  • _action:一個 Action 類型的字段。也就是一個委託,也就是 WorkerThread 要執行的任務,經過 SetWork(Action act) 方法設置值
  • _thread:一個 Thread 對象。也就是真正執行任務的線程
  • _waitEvent:一個 AutoResetEvent 對象。線程池的做用就是池內維護必定數量可重複利用的線程,線程執行每一個任務之後,並非直接「天然」結束,而是繼續執行下一個任務,當沒有可執行的任務的時候就會駐留在線程池內等待有可執行的任務。因此,如何讓一個線程處於等待狀態,而不是直接「天然」結束呢?我是經過這個 AutoResetEvent 對象去控制的。這個對象主要有 WaitOne() 和 Set() 兩個方法,WaitOne() 用於「卡」住線程,讓線程處於停滯狀態,Set() 就是用於通知線程繼續執行(關於這兩個方法的使用以及介紹我就通俗的說明下,若是不熟悉這個類的同窗能夠自行查 msdn)。這兩個方法,在配合一個 while 循環,基本就實現了線程的複用,具體看 Run() 方法。
  • Complete:一個 Action<WorkerThread> 類型的事件。每次執行完任務都會調用該事件,做用就是通知其所在線程池,說明」我「已經執行完」你「分配的任務了。
  • SetWork(Action act):設置線程要執行的任務,其實就是設置字段 _action 的值。這個方法是提供給線程池用的,每次給 WorkerThread 分配任務都是經過調用這個方法。
  • Activate():激活 WorkerThread 執行任務。調用了 SetWork(Action act) 分配任務了之後,就會調用該方法執行任務。這方法裏主要就是調用 _waitEvent.Set() 方法觸發 _thread 線程繼續執行。
  • Run():這是 WorkerThread 對象的核心。建立 _thread 時,給 _thread 設置執行的就是這個 Run() 方法。這個方法內實現就是一個 while 循環,每循環一次就會調用 _waitEvent.WaitOne() 「卡「住線程,直到被調用 Activate() 纔會執行後續代碼,後續代碼也就是執行真正的任務 _action。執行完任務了之後進入到下一個循環等待,直到接收下一個任務和被再次調用 Activate()…如此循環…. 從而達到了咱們循環利用線程的目的

WorkerThread 這個類代碼也不是不少,百來行而已。總結來講它做用就是 SetWork(Action act) –> Activate() –> _action() –> WaitOne() –> SetWork(Action act) –> Activate()…無限循環…但願你們看得明白。多線程

瞭解 WorkerThread 了之後,再來看下真正實現 IThreadPool 接口的類 WorkerThreadPool。前面提到,線程池的做用就是池內維護必定數量可重複利用的線程,WorkerThreadPool 負責 WorkerThread 的建立、接收用戶任務並將任務分配給池內空閒的線程去執行用戶任務,功能其實就這麼簡單。廢話很少說,直接上代碼:框架

public class WorkerThreadPool : IThreadPool, IDisposable
    {
        readonly object _lockObject = new object();
        bool _disposed = false;

        bool _spin = false; //spin 每調用一次 QueueWorkItem 方法都會將 spin 設爲 false,以通知計時線程中止循環了
        double _keepAliveTime = -1;//等待時間,表示從最後一個活動的線程執行完任務後開始計時到必定的時間內都沒有接受到任何任務,則釋放掉池內的全部線程
        DateTime _releaseTime = DateTime.Now;//釋放線程的時間點

        int _threads;
        List<WorkerThread> _allThreads;
        List<WorkerThread> _workingTreads;
        Queue<WorkerThread> _freeTreads;
        Queue<WorkItem> _workQueue;

        List<ReleaseThreadsRecord> _releaseThreadsRecords = new List<ReleaseThreadsRecord>();

        /// <summary>
        /// 建立一個線程池,默認 Threads 爲 25
        /// </summary>
        public WorkerThreadPool()
            : this(25)
        {

        }
        /// <summary>
        /// 建立一個大小爲 threads 的線程池
        /// </summary>
        /// <param name="threads"></param>
        public WorkerThreadPool(int threads)
        {
            if (threads < 1)
                throw new ArgumentException("threads 小於 1");

            this._threads = threads;
            this._allThreads = new List<WorkerThread>(threads);
            this._workingTreads = new List<WorkerThread>(threads);
            this._freeTreads = new Queue<WorkerThread>(threads);
            this._workQueue = new Queue<WorkItem>();
        }

        /// <summary>
        /// 
        /// </summary>
        ~WorkerThreadPool()
        {
            this.Dispose();
        }

        /// <summary>
        /// 線程池大小
        /// </summary>
        public int Threads
        {
            get
            {
                return this.GetPoolSize();
            }
            set
            {
                this.SetPoolSize(value);
            }
        }

        /// <summary>
        /// 一個以毫秒爲單位的值,表示從最後一個活動的線程執行完任務後開始計時,在指定的時間內線程池都沒有接受到任何任務,則釋放掉池內的全部線程。若設置值小於 0,則不會釋放池內線程。如未指定,默認爲 -1。
        /// </summary>
        public double KeepAliveTime
        {
            get
            {
                return this._keepAliveTime;
            }
            set
            {
                this._keepAliveTime = value;
            }
        }

        int GetPoolSize()
        {
            return this._threads;
        }
        void SetPoolSize(int threads)
        {
            if (threads < 1)
                throw new ArgumentException("threads 小於 1");

            lock (this._lockObject)
            {
                this._threads = threads;
                this.AdjustPool();

                WorkerThread workerThread = null;
                WorkItem workItem = null;
                while (this.TryGetWorkerThreadAndWorkItem(out  workerThread, out  workItem, false))
                {
                    this.ActivateWorkerThread(workerThread, workItem);
                    workerThread = null;
                    workItem = null;
                }
            }
        }

        /// <summary>
        /// 獲取當前線程池內的空閒線程數量
        /// </summary>
        /// <returns></returns>
        public int GetAvailableThreads()
        {
            lock (this._lockObject)
            {
                if (this._threads <= this._workingTreads.Count)
                    return 0;

                int r = this._threads - this._workingTreads.Count;
                return r;
            }
        }

        /// <summary>
        /// 獲取當前線程池內工做項總數
        /// </summary>
        /// <returns></returns>
        public int GetWorkCount()
        {
            lock (this._lockObject)
            {
                return this._workQueue.Count + this._workingTreads.Count;
            }
        }

        /// <summary>
        /// 向線程池中添加工做項
        /// </summary>
        /// <param name="callback"></param>
        /// <param name="state"></param>
        /// <returns></returns>
        public bool QueueWorkItem(WaitCallback callback, object state)
        {
            if (callback == null)
                return false;

            WorkerThread workerThread = null;
            WorkItem workItem = null;

            lock (this._lockObject)
            {
                CheckDisposed();

                if (this._workQueue.Count == int.MaxValue)
                    return false;

                this._workQueue.Enqueue(new WorkItem(callback, state));
                this._spin = false;

                if (!this.TryGetWorkerThreadAndWorkItem(out  workerThread, out  workItem, true))
                {
                    return true;
                }
            }

            this.ActivateWorkerThread(workerThread, workItem);
            return true;
        }

        public List<ReleaseThreadsRecord> GetReleaseThreadsRecords()
        {
            lock (this._lockObject)
            {
                List<ReleaseThreadsRecord> list = new List<ReleaseThreadsRecord>(this._releaseThreadsRecords.Count);
                foreach (var releaseThreadsRecord in this._releaseThreadsRecords)
                {
                    list.Add(releaseThreadsRecord);
                }
                return list;
            }
        }

        /// <summary>
        /// 該方法必須在 locked 下執行
        /// </summary>
        /// <param name="workerThread"></param>
        /// <param name="workItem"></param>
        /// <param name="workerThreadCall">是不是當前池內的線程調用該方法</param>
        /// <returns></returns>
        bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall)
        {
            workerThread = null;
            workItem = null;

            if (this._workQueue.Count > 0)
            {
                if (this._freeTreads.Count > 0)
                {
                    workerThread = this._freeTreads.Dequeue();
                    workItem = this._workQueue.Dequeue();
                    this._workingTreads.Add(workerThread);

                    return true;
                }
                else
                {
                    if (this._allThreads.Count < this._threads)
                    {
                        workerThread = new WorkerThread();
                        workItem = this._workQueue.Dequeue();
                        this._allThreads.Add(workerThread);
                        this._workingTreads.Add(workerThread);

                        return true;
                    }
                    return false;
                }
            }
            else
            {
                if (!workerThreadCall)
                    return false;

                double t = this._keepAliveTime;
                if (t < 0)
                {
                    this._workQueue.TrimExcess();
                    return false;
                }

                //此代碼塊只有當前池內的線程完成工做了之後訪問到,從 QueueWorkItem 方法調用該方法是不會執行此代碼塊的,由於 this.workQueue.Count > 0
                if (this._freeTreads.Count == this._allThreads.Count && this._workingTreads.Count == 0 && this._freeTreads.Count > 0)
                {
                    /*
                     *能執行到這,說明池內沒有了任何任務,而且是最後一個活動線程執行完畢
                     *此時從池中取出一個線程來執行 Tick 方法
                     */
                    DateTime now = DateTime.Now;
                    int threadId = Thread.CurrentThread.ManagedThreadId;
                    if (this._allThreads.Any(a => a.ThreadId == threadId))//既然只有當前池內的線程能訪問到這,這句判斷是否是有點多餘了- -
                    {
                        workerThread = this._freeTreads.Dequeue();//彈出一個 WorkerThread 對象,此時不需將彈出的 WorkerThread 對象放入 workingTreads 隊列中,由於該對象是供池內自身計時用,相對外界是不知道的,保證外界調用 GetAvailableThreads 方法能獲得一個合理的結果
                        workItem = new WorkItem((state) =>
                       {
                           this.Tick((WorkerThread)state);
                       }, workerThread);

                        this._spin = true;
                        try
                        {
                            this._releaseTime = now.AddMilliseconds(t);//設置待釋放線程的時間點
                        }
                        catch (ArgumentOutOfRangeException)
                        {
                            this._releaseTime = DateTime.MaxValue;
                        }

                        return true;
                    }
                }

                return false;
            }
        }

        void ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem)
        {
            workerThread.SetWork(workItem.Execute);
            workerThread.Complete += this.WorkComplete;
            workerThread.Activate();
        }
        void WorkComplete(WorkerThread workerThread)
        {
            //避免沒法調用終結器,務必將  this.WorkComplete 從 workerThread.Complete 中移除,取出 workerThread 的時候再加上
            workerThread.Complete -= this.WorkComplete;
            if (this._disposed)
                return;

            WorkerThread nextWorkerThread = null;
            WorkItem nextWorkItem = null;

            lock (this._lockObject)
            {
                if (this._disposed)
                    return;

                this._workingTreads.Remove(workerThread);
                this._freeTreads.Enqueue(workerThread);
                this.AdjustPool();

                if (!this.TryGetWorkerThreadAndWorkItem(out  nextWorkerThread, out  nextWorkItem, true))
                {
                    return;
                }
            }

            this.ActivateWorkerThread(nextWorkerThread, nextWorkItem);
        }

        /// <summary>
        /// 該方法必須在 locked 下執行
        /// </summary>
        void AdjustPool()
        {
            while (this._allThreads.Count > this._threads && this._freeTreads.Count > 0)
            {
                WorkerThread workerThread = this._freeTreads.Dequeue();
                this._allThreads.Remove(workerThread);
                workerThread.Dispose();
            }
        }

        /// <summary>
        /// 自旋計時,直到收到中止自旋的消息或者達到了釋放池內的線程時刻
        /// </summary>
        /// <param name="workerThread"></param>
        void Tick(WorkerThread workerThread)
        {
            DateTime releaseTimeTemp = this._releaseTime;
            while (true)
            {
                lock (this._lockObject)
                {
                    if (!this._spin)
                    {
                        //spin==false,即說明收到了中止計時的通知                        
                        break;
                    }
                    if (DateTime.Now >= releaseTimeTemp)
                    {
                        //此時中止計時,開始釋放線程,在釋放線程前先把以前加上的 WorkComplete 給取消,這樣就不會執行 WorkComplete 方法了,循環終止後 當前線程也天然執行完畢
                        workerThread.Complete -= this.WorkComplete;

                        //釋放全部的線程,也包括當前線程
                        this.ReleaseThreads();
                        this._spin = false;
                        //對於運行到這裏,由於全部的線程已經被釋放(包括當前線程),因此運行完這個方法,當前線程也天然運行結束了
                        break;
                    }
                }
                Thread.SpinWait(1);
            }
        }

        void ReleaseThreads()
        {
            ReleaseThreadsRecord releaseThreadsRecord = new ReleaseThreadsRecord(DateTime.Now);

            foreach (WorkerThread thread in this._allThreads)
            {
                try
                {
                    thread.Dispose();
                }
                catch
                {
                }

                releaseThreadsRecord.ThreadIds.Add(thread.ThreadId);
            }

            releaseThreadsRecord.ThreadIds.TrimExcess();
            this._releaseThreadsRecords.Add(releaseThreadsRecord);

            this._allThreads.Clear();
            this._freeTreads.Clear();
            this._workingTreads.Clear();
            this._workQueue.Clear();
            this._workQueue.TrimExcess();
        }

        void CheckDisposed()
        {
            if (this._disposed)
                throw new ObjectDisposedException(this.GetType().Name);
        }

        /// <summary>
        /// 銷燬當前對象資源,調用此方法會強制終止池內的全部線程
        /// </summary>
        public void Dispose()
        {
            if (this._disposed)
                return;

            //釋放全部線程
            lock (this._lockObject)
            {
                if (this._disposed)
                    return;
                this.ReleaseThreads();
                this._threads = 0;
                this._disposed = true;
            }

            this.Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="disposing"></param>
        protected virtual void Dispose(bool disposing)
        {
        }


        class WorkItem
        {
            bool _executed = false;
            WaitCallback _callback;
            object _state;
            public WorkItem(WaitCallback callback, object state)
            {
                if (callback == null)
                    throw new ArgumentNullException("callback");

                this._callback = callback;
                this._state = state;
            }

            public void Execute()
            {
                if (this._executed)
                    throw new Exception("當前 WorkItem 已經執行");

                this._executed = true;
                this._callback(this._state);
            }
        }

        public class ReleaseThreadsRecord
        {
            public ReleaseThreadsRecord()
            {
                this.ThreadIds = new List<int>();
            }
            public ReleaseThreadsRecord(DateTime releaseTime)
            {
                this.ReleaseTime = releaseTime;
                this.ThreadIds = new List<int>();
            }

            /// <summary>
            /// 釋放時間
            /// </summary>
            public DateTime ReleaseTime { get; set; }
            /// <summary>
            /// 被釋放線程的 Id 集合
            /// </summary>
            public List<int> ThreadIds { get; set; }

            public ReleaseThreadsRecord Clone()
            {
                ReleaseThreadsRecord record = new ReleaseThreadsRecord(this.ReleaseTime);
                record.ThreadIds = this.ThreadIds.ToList();

                return record;
            }
        }
    }
View Code

這個類代碼仍是挺多的(貌似有點佔篇幅- - ),雖然代碼裏都加上了註釋,但我仍是想給你們簡單說說其實現思路以及內部一些核心相關成員,方便你們更快的理解。ide

  • _threads:一個類型爲 int 的字段。表示當前池的大小
  • _allThreads:一個類型爲 List<WorkerThread> 的字段。用於存儲池內建立的全部 WorkerThread
  • _workingTreads:一個類型爲 List<WorkerThread> 的字段。用於存儲正在執行任務的 WorkerThread
  • _freeTreads:一個類型爲 Queue<WorkerThread> 的字段。用於存儲處於空閒狀態的 WorkerThread
  • _workQueue:一個類型爲 Queue<WorkItem> 的字段。用於存儲用戶往當前池裏塞的全部任務
  • SetPoolSize(int threads):設置線程池大小。這個方法主要作了兩件事:1.設置線程池大小,也就是字段 _threads 的值。2.調整線程池內線程。當設置的值小於當前池內的大小時,則釋放掉多出的空閒線程;當設置的值大於當前池大小時,若是 _workQueue 隊列有待處理的任務的話,會嘗試着建立新的 WorkerThread 去執行 _workQueue 隊列裏的任務,目的就是爲了使當前池一直處於滿負荷狀態。
  • bool QueueWorkItem(WaitCallback callback, object state):向線程池中添加任務。每次調用這個方法,都會將 callback 和 state 封裝成一個 WorkItem,而後將封裝的 WorkItem 對象放入 _workQueue 隊列。而後嘗試調用 TryGetWorkerThreadAndWorkItem 方法獲取可用的 WorkerThread 以及 _workQueue 隊列第一個任務,若是獲取成功(即有可用的 WorkerThread 和待處理的 WorkItem),就會將取出的 WorkItem 分配給取出的 WorkerThread 去執行。
  • bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall):嘗試從池內取出一個處於空閒的 WorkerThread 和待處理的 WorkItem。這個方法的實現不是很複雜,若是池內有空閒的 WorkerThread 和待處理的 WorkItem,則返回 true,不然返回 false。目前咱們這個線程池內 WorkerThread 的建立不是伴隨線程池建立而建立,而是真正須要用到的時候纔會去建立。即當有任務往池裏塞的時候,首先會判斷 _freeTreads 集合內是否有空閒的 WorkerThread,若是有,則彈出一個空閒的 WorkerThread 去執行任務,同時將彈出的 WorkerThread 添加到 _workingTreads 集合中,沒有的話纔會去建立新的 WorkerThread 去執行任務,同時也會將新建的 WorkerThread 添加到 _workingTreads 集合中。
  • ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem):這個方法體內的實現很簡單,就是將 workItem 分配給 workerThread,同時調用 workerThread.Activate() 激活線程執行任務,調用 workerThread.Activate()會將當前池內的方法 WorkComplete(WorkerThread workerThread) 綁定到 workerThread 定義的 Complete 事件上,每當 workerThread 執行完任務之後,都會觸發 workerThread.Complete 事件,以通知其所在的線程池。
  • WorkComplete(WorkerThread workerThread):每當 workerThread 執行完任務之後,都會調用該方法。該方法參數是一個 WorkerThread 對象,也就是說每一個 workerThread 執行完任務後都會將本身做爲參數調用這個方法。在這個方法內主要是作三件事:1.將執行完任務的 workerThread 從 _workingTreads 集合中移除,而後將 workerThread 添加到空閒線程隊列 _freeTreads 中。2.調整線程池線程(若是有必要的話),爲何在這要進行調整線程池呢?由於會出現這種狀況,好比當前線程池大小是 10,正在工做的線程爲 6 個,空閒線程也就是 4 個,這時候咱們調用 SetPoolSize(5),也就是將線程池大小設置爲 5,減小了線程池的容量,雖然在 SetPoolSize 方法內會調整了一遍線程池大小,但 SetPoolSize 方法內只會銷燬掉空閒的線程,也就是 4 個空閒線程會被銷燬,這時候池內其實仍是存在 6 個線程。因此還須要銷燬一個,這時候怎麼辦呢?不可能在 SetPoolSize 方法內把正在執行任務的線程給終止掉吧?所以,workerThread 每次執行完任務後都要執行一次調整線程池的操做,以保證池內的線程數量是正確的。3.調用 TryGetWorkerThreadAndWorkItem 方法,若是有待處理的任務的話,則繼續處理下一個任務,這樣就達到了持續處理 _workQueue 隊列內任務的目的。

上面就是 WorkerThreadPool 的一些核心字段以及方法,至於其它的成員就不作詳細說明了。 爲了方便管理,池內用了 _freeTreads 和 _workingTreads 兩個集合來維護池內線程狀態。因此每次從空閒線程 _freeTreads 取出 workerThread 執行任務的時候,都必須將 workerThread 添加到 _workingTreads 集合中;每一個 workerThread 執行完任務都會將本身從 _workingTreads 移除,同時將本身置爲空閒線程添加到 _freeTreads 集合中等待接受下一個任務來臨,因此 WorkComplete 方法體內最後都要調用 TryGetWorkerThreadAndWorkItem 方法獲取可用的 WorkerThread 以及一個待處理的任務,而後執行,這樣就造成了一個循環,只要有任務,池內就會一直處於滿負荷狀態。性能

開篇提到一個需求:沒有爬取任務的時候,須要減小甚至清空池內的全部線程,以避免池內線程一直掛着佔用系統資源。所以我給 IThreadPool 加了一個屬性:KeepAliveTime。經過這個屬性,能夠給線程池設定一個時間,即線程池在指定的時間內都沒有接收到任何任務,則會自行將池內的線程給銷燬。在 WorkerThreadPool 中這個功能的實現很簡單,在最後一個任務被執行完了之後,會自動從池內取出一個空閒的 workerThread 執行計時操做,也就是 WorkerThreadPool.Tick 方法,其實現也就是自旋計時,若是過了指定時間後都沒有接受到任務,則自動將池內的線程給銷燬。這個計時實現很簡陋- - ,技術有限,想不到其它好辦法了。學習

咱們的這個線程池設計簡單,功能不是很強,但很適合咱們如今的程序,至少讓我用的安心。目前已經在服務器上跑了一年半,一切都很正常。小程進入園子已有3年,在這麼好的平臺上小程一直都只知道汲取,卻從未想過回報。所以,我想給你們分享點東西,雖然這個 WorkerThreadPool 簡單,沒什麼高深的技術,但也算是個小結晶。若是你們有好建議,小程將萬分感謝!網站

第一次寫博,好緊張!最後,感謝博客園提供了這麼一個良好的學習平臺!

相關文章
相關標籤/搜索