通過一個多小時的代碼排查終於查明瞭線上程序線程數過多的緣由:這是一個接收mq消息的一個服務,程序大致思路是這樣的,監聽的線程每次收到一條消息,就啓動一個線程去執行,每次啓動的線程都是新的。說到這裏,我們就談一談這個程序有哪些弊端呢:前端
線程多的問題該怎麼解決呢,增長cpu核心數?治標不治本。對於開發者而言,最爲經常使用也最爲有效的是線程池化,也就是說線程池。後端
線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。這避免了在處理短期任務時建立與銷燬線程的代價。線程池不只可以保證內核的充分利用,還能防止過度調度。可用線程數量應該取決於可用的併發處理器、處理器內核、內存、網絡sockets等的數量。 例如,線程數通常取cpu數量+2比較合適,線程數過多會致使額外的線程切換開銷。數組
線程池其中一項很重要的技術點就是任務的隊列,隊列雖然屬於一種基礎的數據結構,可是發揮了舉足輕重的做用。bash
隊列是一種特殊的線性表,特殊之處在於它只容許在表的前端(front)進行刪除操做,而在表的後端(rear)進行插入操做,和棧同樣,隊列是一種操做受限制的線性表。進行插入操做的端稱爲隊尾,進行刪除操做的端稱爲隊頭。服務器
隊列是一種採用的FIFO(first in first out)方式的線性表,也就是常常說的先進先出策略。 網絡
public class QueueArray<T>
{
//隊列元素的數組容器
T[] container = null;
int IndexHeader, IndexTail;
public QueueArray(int size)
{
container = new T[size];
IndexHeader = 0;
IndexTail = 0;
}
public void Enqueue(T item)
{
//入隊的元素放在頭指針的指向位置,而後頭指針前移
container[IndexHeader] = item;
IndexHeader++;
}
public T Dequeue()
{
//出隊:把尾元素指針指向的元素取出並清空(不清空也能夠)對應的位置,尾指針前移
T item = container[IndexTail];
container[IndexTail] = default(T);
IndexTail++;
return item;
}
}
複製代碼
public class QueueLinkList<T>
{
LinkedList<T> contianer = null;
public QueueLinkList()
{
contianer = new LinkedList<T>();
}
public void Enqueue(T item)
{
//入隊的元素其實就是加入到隊尾
contianer.AddLast(item);
}
public T Dequeue()
{
//出隊:取鏈表第一個元素,而後把這個元素刪除
T item = contianer.First.Value;
contianer.RemoveFirst();
return item;
}
}
複製代碼
隊列經過數組來實現的話有什麼問題嗎?是的。首先基於數組不可變本質的因素(具體可參考菜菜以前的文章),當一個隊列的元素把數組沾滿的時候,數組擴容是有性能問題的,數組的擴容過程不僅是開闢新空間分配內存那麼簡單,還要有數組元素的copy過程,更可怕的是會給GC形成極大的壓力。若是數組比較小可能影響比較小,可是當一個數組比較大的時候,好比佔用500M內存的一個數組,數據copy其實會形成比較大的性能損失。數據結構
隊列經過數組來實現,隨着頭指針和尾指針的位置移動,尾指針最終會指向第一個元素的位置,也就是說沒有元素能夠出隊了,其實要解決這個問題有兩種方式,其一:在出隊或者入隊的過程當中不斷的移動全部元素的位置,避免上邊所說的極端狀況發生;其二:能夠把數組的首尾元素鏈接起來,使其成爲一個環狀,也就是常常說的循環隊列。多線程
隊列在一些特殊場景下其實還有一些變種,好比說循環隊列,阻塞隊列,併發隊列等,有興趣的同窗能夠去研究一下,這裏不在展開討論。這裏說到阻塞隊列就多說一句,其實用阻塞隊列能夠實現一個最基本的生產者消費者模式。併發
當隊列用鏈表方式實現的時候,因爲鏈表的首尾操做時間複雜度都是O(1),並且沒有空間大小的限制,因此通常的隊列用鏈表實現更簡單。socket
當隊列中無元素可出隊或者沒有空間可入隊的時候,是阻塞當前的操做仍是返回錯誤信息,取決於在座各位隊列的設計者了。
//線程池
public class ThreadPool
{
bool PoolEnable = false; //線程池是否可用
List<Thread> ThreadContainer = null; //線程的容器
ConcurrentQueue<ActionData> JobContainer = null; //任務的容器
public ThreadPool(int threadNumber)
{
PoolEnable = true;
ThreadContainer = new List<Thread>(threadNumber);
JobContainer = new ConcurrentQueue<ActionData>();
for (int i = 0; i < threadNumber; i++)
{
var t = new Thread(RunJob);
ThreadContainer.Add(t);
t.Start();
}
}
//向線程池添加一個任務
public void AddTask(Action<object> job,object obj, Action<Exception> errorCallBack=null)
{
if (JobContainer != null)
{
JobContainer.Enqueue(new ActionData { Job = job, Data = obj , ErrorCallBack= errorCallBack });
}
}
//終止線程池
public void FinalPool()
{
PoolEnable = false;
JobContainer = null;
if (ThreadContainer != null)
{
foreach (var t in ThreadContainer)
{
//強制線程退出並很差,會有異常
//t.Abort();
t.Join();
}
ThreadContainer = null;
}
}
private void RunJob()
{
while (true&& JobContainer!=null&& PoolEnable)
{
//任務列表取任務
ActionData job=null;
JobContainer?.TryDequeue(out job);
if (job == null)
{
//若是沒有任務則休眠
Thread.Sleep(10);
continue;
}
try
{
//執行任務
job.Job.Invoke(job.Data);
}
catch(Exception error)
{
//異常回調
job?.ErrorCallBack(error);
}
}
}
}
public class ActionData
{
//執行任務的參數
public object Data { get; set; }
//執行的任務
public Action<object> Job { get; set; }
//發生異常時候的回調方法
public Action<Exception> ErrorCallBack { get; set; }
}
複製代碼
ThreadPool pool = new ThreadPool(100);
for (int i = 0; i < 5000; i++)
{
pool.AddTask((obj) =>
{
Console.WriteLine($"{obj}__{System.Threading.Thread.CurrentThread.ManagedThreadId}");
}, i, (e) =>
{
Console.WriteLine(e.Message);
});
}
pool.FinalPool();
Console.Read();
複製代碼
添加關注,查看更精美版本,收穫更多精彩