通過一個多小時的代碼排查終於查明瞭線上程序線程數過多的緣由:這是一個接收mq消息的一個服務,程序大致思路是這樣的,監聽的線程每次收到一條消息,就啓動一個線程去執行,每次啓動的線程都是新的。說到這裏,我們就談一談這個程序有哪些弊端呢:前端
線程多的問題該怎麼解決呢,增長cpu核心數?治標不治本。對於開發者而言,最爲經常使用也最爲有效的是線程池化,也就是說線程池。後端
線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。這避免了在處理短期任務時建立與銷燬線程的代價。線程池不只可以保證內核的充分利用,還能防止過度調度。可用線程數量應該取決於可用的併發處理器、處理器內核、內存、網絡sockets等的數量。 例如,線程數通常取cpu數量+2比較合適,線程數過多會致使額外的線程切換開銷。
線程池其中一項很重要的技術點就是任務的隊列,隊列雖然屬於一種基礎的數據結構,可是發揮了舉足輕重的做用。數組
隊列是一種特殊的線性表,特殊之處在於它只容許在表的前端(front)進行刪除操做,而在表的後端(rear)進行插入操做,和棧同樣,隊列是一種操做受限制的線性表。進行插入操做的端稱爲隊尾,進行刪除操做的端稱爲隊頭。
隊列是一種採用的FIFO(first in first out)方式的線性表,也就是常常說的先進先出策略。服務器
隊列能夠用數組Q[1…m]來存儲,數組的上界m便是隊列所允許的最大容量。在隊列的運算中需設兩個指針:head,隊頭指針,指向實際隊頭元素+1的位置;tail,隊尾指針,指向實際隊尾元素位置。通常狀況下,兩個指針的初值設爲0,這時隊列爲空,沒有元素。如下爲一個簡單的實例(生產環境須要優化):網絡
public class QueueArray<T> { //隊列元素的數組容器 T[] container = null; int IndexHeader, IndexTail; public QueueArray(int size) { container = new T[size]; IndexHeader = 0; IndexTail = 0; } public void Enqueue(T item) { //入隊的元素放在頭指針的指向位置,而後頭指針前移 container[IndexHeader] = item; IndexHeader++; } public T Dequeue() { //出隊:把尾元素指針指向的元素取出並清空(不清空也能夠)對應的位置,尾指針前移 T item = container[IndexTail]; container[IndexTail] = default(T); IndexTail++; return item; } }
隊列採用的FIFO(first in first out),新元素老是被插入到鏈表的尾部,而讀取的時候老是從鏈表的頭部開始讀取。每次讀取一個元素,釋放一個元素。所謂的動態建立,動態釋放。於是也不存在溢出等問題。因爲鏈表由元素鏈接而成,遍歷也方便。如下是一個實例僅供參考:數據結構
public class QueueLinkList<T> { LinkedList<T> contianer = null; public QueueLinkList() { contianer = new LinkedList<T>(); } public void Enqueue(T item) { //入隊的元素其實就是加入到隊尾 contianer.AddLast(item); } public T Dequeue() { //出隊:取鏈表第一個元素,而後把這個元素刪除 T item = contianer.First.Value; contianer.RemoveFirst(); return item; } }
//線程池 public class ThreadPool { bool PoolEnable = false; //線程池是否可用 List<Thread> ThreadContainer = null; //線程的容器 ConcurrentQueue<ActionData> JobContainer = null; //任務的容器 public ThreadPool(int threadNumber) { PoolEnable = true; ThreadContainer = new List<Thread>(threadNumber); JobContainer = new ConcurrentQueue<ActionData>(); for (int i = 0; i < threadNumber; i++) { var t = new Thread(RunJob); ThreadContainer.Add(t); t.Start(); } } //向線程池添加一個任務 public void AddTask(Action<object> job,object obj, Action<Exception> errorCallBack=null) { if (JobContainer != null) { JobContainer.Enqueue(new ActionData { Job = job, Data = obj , ErrorCallBack= errorCallBack }); } } //終止線程池 public void FinalPool() { PoolEnable = false; JobContainer = null; if (ThreadContainer != null) { foreach (var t in ThreadContainer) { //強制線程退出並很差,會有異常 //t.Abort(); t.Join(); } ThreadContainer = null; } } private void RunJob() { while (true&& JobContainer!=null&& PoolEnable) { //任務列表取任務 ActionData job=null; JobContainer?.TryDequeue(out job); if (job == null) { //若是沒有任務則休眠 Thread.Sleep(10); continue; } try { //執行任務 job.Job.Invoke(job.Data); } catch(Exception error) { //異常回調 job?.ErrorCallBack(error); } } } } public class ActionData { //執行任務的參數 public object Data { get; set; } //執行的任務 public Action<object> Job { get; set; } //發生異常時候的回調方法 public Action<Exception> ErrorCallBack { get; set; } }
ThreadPool pool = new ThreadPool(100); for (int i = 0; i < 5000; i++) { pool.AddTask((obj) => { Console.WriteLine($"{obj}__{System.Threading.Thread.CurrentThread.ManagedThreadId}"); }, i, (e) => { Console.WriteLine(e.Message); }); } pool.FinalPool(); Console.Read();
添加關注,查看更精美版本,收穫更多精彩