程序員修仙之路-數據結構之設計一個高性能線程池

image

緣由排查

通過一個多小時的代碼排查終於查明瞭線上程序線程數過多的緣由:這是一個接收mq消息的一個服務,程序大致思路是這樣的,監聽的線程每次收到一條消息,就啓動一個線程去執行,每次啓動的線程都是新的。說到這裏,我們就談一談這個程序有哪些弊端呢:前端

  1. 每次收到一條消息都建立一個新的線程,要知道線程的資源對於系統來講是很昂貴的,消息處理完成還要銷燬這個線程。
  2. 這個程序用到的線程數量是沒有限制的。當線程到達必定數量,程序反而因線程在cpu切換開銷的緣由處理效率下降。不管的你的服務器cpu是多少核心,這個現象都有發生的可能。

解決問題

線程多的問題該怎麼解決呢,增長cpu核心數?治標不治本。對於開發者而言,最爲經常使用也最爲有效的是線程池化,也就是說線程池。後端

線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。這避免了在處理短期任務時建立與銷燬線程的代價。線程池不只可以保證內核的充分利用,還能防止過度調度。可用線程數量應該取決於可用的併發處理器、處理器內核、內存、網絡sockets等的數量。 例如,線程數通常取cpu數量+2比較合適,線程數過多會致使額外的線程切換開銷。

線程池其中一項很重要的技術點就是任務的隊列,隊列雖然屬於一種基礎的數據結構,可是發揮了舉足輕重的做用。數組

隊列

隊列是一種特殊的線性表,特殊之處在於它只容許在表的前端(front)進行刪除操做,而在表的後端(rear)進行插入操做,和棧同樣,隊列是一種操做受限制的線性表。進行插入操做的端稱爲隊尾,進行刪除操做的端稱爲隊頭。

隊列是一種採用的FIFO(first in first out)方式的線性表,也就是常常說的先進先出策略。
image服務器

實現
  1. 數組

隊列能夠用數組Q[1…m]來存儲,數組的上界m便是隊列所允許的最大容量。在隊列的運算中需設兩個指針:head,隊頭指針,指向實際隊頭元素+1的位置;tail,隊尾指針,指向實際隊尾元素位置。通常狀況下,兩個指針的初值設爲0,這時隊列爲空,沒有元素。如下爲一個簡單的實例(生產環境須要優化):網絡

public class QueueArray<T>
    {
        //隊列元素的數組容器
        T[] container = null;
        int IndexHeader, IndexTail;
        public QueueArray(int size)
        {
            container = new T[size];
            IndexHeader = 0;
            IndexTail = 0;
        }
        public void Enqueue(T item)
        {
            //入隊的元素放在頭指針的指向位置,而後頭指針前移
            container[IndexHeader] = item;
            IndexHeader++;
        }
        public T Dequeue()
        {
            //出隊:把尾元素指針指向的元素取出並清空(不清空也能夠)對應的位置,尾指針前移
            T item = container[IndexTail];
            container[IndexTail] = default(T);
            IndexTail++;
            return item;
        }

    }
  1. 鏈表

隊列採用的FIFO(first in first out),新元素老是被插入到鏈表的尾部,而讀取的時候老是從鏈表的頭部開始讀取。每次讀取一個元素,釋放一個元素。所謂的動態建立,動態釋放。於是也不存在溢出等問題。因爲鏈表由元素鏈接而成,遍歷也方便。如下是一個實例僅供參考:數據結構

public class QueueLinkList<T>
    {
        LinkedList<T> contianer = null;
        public QueueLinkList()
        {
            contianer = new LinkedList<T>();
        }
        public void Enqueue(T item)
        {
            //入隊的元素其實就是加入到隊尾
            contianer.AddLast(item);
        }
        public T Dequeue()
        {
            //出隊:取鏈表第一個元素,而後把這個元素刪除
            T item = contianer.First.Value;
            contianer.RemoveFirst();
            return item;
        }

    }
隊列擴展閱讀
  1. 隊列經過數組來實現的話有什麼問題嗎?是的。首先基於數組不可變本質的因素(具體可參考菜菜以前的文章),當一個隊列的元素把數組沾滿的時候,數組擴容是有性能問題的,數組的擴容過程不僅是開闢新空間分配內存那麼簡單,還要有數組元素的copy過程,更可怕的是會給GC形成極大的壓力。若是數組比較小可能影響比較小,可是當一個數組比較大的時候,好比佔用500M內存的一個數組,數據copy其實會形成比較大的性能損失。
  2. 隊列經過數組來實現,隨着頭指針和尾指針的位置移動,尾指針最終會指向第一個元素的位置,也就是說沒有元素能夠出隊了,其實要解決這個問題有兩種方式,其一:在出隊或者入隊的過程當中不斷的移動全部元素的位置,避免上邊所說的極端狀況發生;其二:能夠把數組的首尾元素鏈接起來,使其成爲一個環狀,也就是常常說的循環隊列。
  3. 隊列在一些特殊場景下其實還有一些變種,好比說循環隊列,阻塞隊列,併發隊列等,有興趣的同窗能夠去研究一下,這裏不在展開討論。這裏說到阻塞隊列就多說一句,其實用阻塞隊列能夠實現一個最基本的生產者消費者模式。
  4. 當隊列用鏈表方式實現的時候,因爲鏈表的首尾操做時間複雜度都是O(1),並且沒有空間大小的限制,因此通常的隊列用鏈表實現更簡單。
  5. 當隊列中無元素可出隊或者沒有空間可入隊的時候,是阻塞當前的操做仍是返回錯誤信息,取決於在座各位隊列的設計者了。

簡單實用的線程池

//線程池
    public class ThreadPool
    {
        bool PoolEnable = false; //線程池是否可用 
        List<Thread> ThreadContainer = null; //線程的容器
        ConcurrentQueue<ActionData> JobContainer = null; //任務的容器
        public ThreadPool(int threadNumber)
        {
            PoolEnable = true;
            ThreadContainer = new List<Thread>(threadNumber);
            JobContainer = new ConcurrentQueue<ActionData>();
            for (int i = 0; i < threadNumber; i++)
            {
                var t = new Thread(RunJob);
                ThreadContainer.Add(t);
                t.Start();
            }           
        }
        //向線程池添加一個任務
        public void AddTask(Action<object> job,object obj, Action<Exception> errorCallBack=null)
        {
            if (JobContainer != null)
            {
                JobContainer.Enqueue(new ActionData { Job = job, Data = obj , ErrorCallBack= errorCallBack });
            }
          
        }
        //終止線程池
        public void FinalPool()
        {
            PoolEnable = false;
            JobContainer = null;
            if (ThreadContainer != null)
            {
                foreach (var t in ThreadContainer)
                {
                    //強制線程退出並很差,會有異常
                    //t.Abort();
                    t.Join();                    
                }
                ThreadContainer = null;
            }

        }
        private  void RunJob()
        {
            while (true&& JobContainer!=null&& PoolEnable)
            {
                //任務列表取任務
                ActionData job=null;
                JobContainer?.TryDequeue(out job);
                if (job == null)
                {
                    //若是沒有任務則休眠
                    Thread.Sleep(10);
                    continue;
                }
                try
                {
                    //執行任務
                    job.Job.Invoke(job.Data);
                }
                catch(Exception error)
                {
                    //異常回調
                    job?.ErrorCallBack(error);
                }
            }
        }
    }

    public class ActionData
    {
        //執行任務的參數
        public object Data { get; set; }
        //執行的任務
        public Action<object> Job { get; set; }
        //發生異常時候的回調方法
        public Action<Exception> ErrorCallBack { get; set; }
    }

使用

ThreadPool pool = new ThreadPool(100);
            for (int i = 0; i < 5000; i++)
            {
                pool.AddTask((obj) =>
                {
                    Console.WriteLine($"{obj}__{System.Threading.Thread.CurrentThread.ManagedThreadId}");
                }, i, (e) =>
                {
                    Console.WriteLine(e.Message);
                });
            }
            pool.FinalPool();
            Console.Read();

添加關注,查看更精美版本,收穫更多精彩

image

相關文章
相關標籤/搜索