在web應用中,單個進程或者機器的響應速度有限,相似大量數據導入導出的操做的數量若是不加限制,會致使服務器cpu被吃滿,致使其餘一些很簡單的請求沒法及時響應的問題。針對這個限制提出了以下要求。
1. 先到達的請求先執行: 先入先出原則
2. 只能同時執行若干請求:避免cpu被吃滿
3. 異步執行:若是長時間執行會長期佔用iis的工做線程web
基於上述的要求我設計了一個隊列。這個隊列咱們須要稍微提一個組件,ParallelExtensionsExtrasapi
這是微軟提供的一個線程的擴展,具體的自行搜索下相關資料,這裏開始的時候我並無用這個組件,而是本身對task的封裝,可是實際上task仍是利用的線程池,線程池默認的線程數10個,並不能知足某些場景對多個現成的要求,因而在漫長的搜索過程當中才發現了這個組建。緩存
這裏我主要用到兩個對TaskScheduler的擴展安全
QueuedTaskScheduler:對task進行排隊執行,執行時在Thread環境中,而且能夠控制線程的數量,相似自定義的線程池。服務器
ThreadPerTaskScheduler: 顧名思義就是在線程中執行每一個task。session
針對兩種區別在於,QueuedTaskScheduler 的線程是可複用的,在線程數量固定的狀況下推薦使用。多線程
ThreadPerTaskScheduler 只建立線程而不進行銷燬,每次執行一個task都是使用一個new一個thread執行。適用於在某些線程數量動態變化的狀況。併發
下面是實現代碼:app
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Schedulers; using Coralcode.Framework.Data; using Coralcode.Framework.Extensions; using ThreadState = System.Threading.ThreadState; namespace Coralcode.Framework.Task { /// <summary> /// 線程隊列 /// 注意若是使用ScheduleType.Defaut 存在線程池數量限制 /// </summary> public class TaskQueue { public static readonly TaskQueue Instance = new TaskQueue(1, ScheduleType.Thread); /// <summary> /// 獲取隊列 /// </summary> /// <param name="maxCount">最大併發數</param> /// <param name="type">執行計劃</param> /// <returns></returns> public static TaskQueue GetQueue(int maxCount,ScheduleType type=ScheduleType.Thread) { return new TaskQueue(maxCount, type); } /// <summary> /// 獲取隊列 /// </summary> /// <param name="maxCount">最大併發數</param> /// <param name="name">隊列名稱</param> /// <param name="type">執行計劃</param> /// <returns></returns> public static TaskQueue GetQueue(int maxCount, string name, ScheduleType type = ScheduleType.Thread) { return new TaskQueue(maxCount, name, type); } /// <summary> /// 獲取隊列 /// </summary> /// <param name="maxCount">最大併發數</param> /// <param name="name">隊列名稱</param> /// <param name="token">取消令牌</param> /// <param name="type">執行計劃</param> public static TaskQueue GetQueue(int maxCount, string name, CancellationToken token, ScheduleType type = ScheduleType.Thread) { return new TaskQueue(maxCount, name, token, type); } /// <summary> /// 獲取隊列 /// </summary> /// <param name="maxCount">最大併發數</param> /// <param name="token">取消令牌</param> /// <param name="type">執行計劃</param> public static TaskQueue GetQueue(int maxCount, CancellationToken token, ScheduleType type = ScheduleType.Thread) { return new TaskQueue(maxCount, IdentityGenerator.NewGuidString(), token, type); } private ConcurrentQueue<System.Threading.Tasks.Task> _tasks; private readonly int _limitedTaskCount; private int _runningTaskCount; private Thread _mainExcuteThread; private CancellationToken _token; private TaskScheduler _scheduler; private TaskQueue(int maxCount, ScheduleType type) : this(maxCount, IdentityGenerator.NewGuidString(), type) { } private TaskQueue(int maxCount, string name, ScheduleType type) : this(maxCount, name, CancellationToken.None, type) { } private TaskQueue(int maxCount, string name, CancellationToken token, ScheduleType type) { _limitedTaskCount = maxCount; _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>(); Name = name; _token = token; switch (type) { case ScheduleType.Default: _scheduler = new QueuedTaskScheduler(maxCount); break; case ScheduleType.Thread: _scheduler = new ThreadPerTaskScheduler(); break; default: throw new ArgumentOutOfRangeException(nameof(type), type, null); } } /// <summary> /// 執行不帶返回結果的方法 /// </summary> /// <param name="func"></param> /// <returns></returns> public System.Threading.Tasks.Task Execute(Action func) { var task = new System.Threading.Tasks.Task(func, _token); _tasks.Enqueue(task); if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped)) { _mainExcuteThread?.DisableComObjectEagerCleanup(); _mainExcuteThread = new Thread(NotifyThreadPendingWork); _mainExcuteThread.Start(); } return task; } /// <summary> /// 執行帶返回結果的方法 /// </summary> /// <typeparam name="TResult"></typeparam> /// <param name="func"></param> /// <returns></returns> [MethodImpl(MethodImplOptions.Synchronized)] public Task<TResult> Execute<TResult>(Func<TResult> func) { var task = new Task<TResult>(func, _token); _tasks.Enqueue(task); if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped)) { _mainExcuteThread?.DisableComObjectEagerCleanup(); _mainExcuteThread = new Thread(NotifyThreadPendingWork); _mainExcuteThread.Priority = ThreadPriority.Highest; _mainExcuteThread.Start(); } return task; } private void NotifyThreadPendingWork() { try { while (true) { if (_token.IsCancellationRequested) { _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>(); break; } System.Threading.Tasks.Task task; if (!_tasks.TryDequeue(out task)) break; task.Start(_scheduler); Interlocked.Increment(ref _runningTaskCount); task.ContinueWith(item => { Interlocked.Decrement(ref _runningTaskCount); }); //Debug.WriteLine("隊列{0},容許執行 {1} 條,等待線程爲 {2} ,執行中 {3} 條,時間爲 {4} ", _name, _limitedTaskCount, _tasks.Count, _runningTaskCount, DateTime.Now); while (_runningTaskCount >= _limitedTaskCount) { Thread.Sleep(500); } } } finally { _runningTaskCount = 0; } } /// <summary> /// 線程隊列的名字 /// </summary> public string Name { get; } /// <summary> /// 根據返回的task的id獲取到當前task排隊的位置 /// </summary> /// <param name="taskId"></param> /// <returns>返回-1表示正在執行,或者task沒有加進去,返回大於等於0則表示其順序</returns> public int GetCurrentTaskIndex(int taskId) { lock (_tasks) { return _tasks.IndexOf(item => item.Id == taskId); } } /// <summary> /// 等待執行的線程數量 /// </summary> public int WaitingTaskCount => _tasks.Count; /// <summary> /// 正在執行的線程數量 /// </summary> public int RunningTaskCount => _runningTaskCount; /// <summary> /// 併發數 /// </summary> public int LimitedTaskCount => _limitedTaskCount; } /// <summary> /// /// </summary> public enum ScheduleType { /// <summary> /// 線程池,默認方式 /// </summary> Default, /// <summary> /// 自定義線程,ThreadPerTaskScheduler /// </summary> Thread } }
重點部分說明
1. ConcurrentQueue 自己在處理多線程環境因此採用線程安全的隊列。
2. ContinueWith 在任務執行完畢以後須要對執行的數量-1。
3. NotifyThreadPendingWork 這裏就是啓動另一個主線程來對任務進行分發。
4. 主線程分發來知足異步的需求。
5. CancellationToken 提供取消執行的功能。
6. 能夠採用默認也能夠本身實例化,而默認是一個線程,即考慮經常使用狀況,也提供擴展的功能。異步
測試代碼
寫出來這個隊列的部分可能只用了1小時,可是寫測試代碼和調試用了差很少半天時間才搞定。
線程的測試在單元測試中一直是難以控制的,在這個case中多個線程併發的狀況下,實時獲取排隊數量,執行中數量也是個很難測試的部分。
下面是測試代碼:
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Coralcode.Framework.Task; using Coralcode.Framework.Utils; using iTextSharp.text; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace FrameworkTest.Task { [TestClass] public class TaskQueueTest { [TestMethod] public void TwoTimesExcuteWithFreeBetweenTwoTimsTest() { var wantExecuteTaskCount = 10; var maxCanExcuteTaskCount = 1; var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount); var addList = new List<int>(); var executedList = new List<int>(); for (int i = 0; i < wantExecuteTaskCount; i++) { addList.Add(i); } /* * 測試原理: * 添加一批任務,等待執行完成之後在執行一次 * 第二次也能執行完畢,無報錯的話認爲, * 隊列在執行完畢後自動暫停,等有新任務進來的時候能夠從新啓動 */ addList.ForEach(index => { var list = executedList; queue.Execute(() => { var item = index / maxCanExcuteTaskCount; list.Add(item); return true; }); }); while (addList.Count != executedList.Count) { Thread.Sleep(1000); } Assert.AreEqual(queue.WaitingTaskCount, 0); executedList.Clear(); for (int i = 0; i < wantExecuteTaskCount; i++) { addList.Add(i); } addList.ForEach(index => { var list = executedList; queue.Execute(() => { var item = index / maxCanExcuteTaskCount; list.Add(item); return true; }); }); while (addList.Count != executedList.Count) { Thread.Sleep(1000); } Assert.AreEqual(queue.WaitingTaskCount, 0); } [TestMethod] public void ExecuteAsAddSequenceTest() { var wantExecuteTaskCount = 10; var maxCanExcuteTaskCount = 1; var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount); var addList = new List<int>(); var executedList = new List<int>(); for (int i = 0; i < wantExecuteTaskCount; i++) { addList.Add(i); } /* * 測試原理: * 按需添加,而後記錄執行,結果也必須是順序的 * 若是一次執行多條,那麼多條被認爲同一個批次 * 同一個批次的順序應該是一致的 */ addList.ForEach(index => { var list = executedList; queue.Execute(() => { var item = index / maxCanExcuteTaskCount; list.Add(item); return true; }); }); while (addList.Count != executedList.Count) { Thread.Sleep(1000); } addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList(); executedList = executedList.Distinct().ToList(); for (int i = 0; i < addList.Count; i++) { Assert.AreEqual(addList[i], executedList[i]); } } [TestMethod] public void WaitCountTest() { var wantExecuteTaskCount = 10; var maxCanExecuteTaskCount = 2; var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount); var executedOrExecutingCount = 0; /* * 測試原理: * 記錄執行了的數量 * 隊列中等待的數量 =最開始放入的數量-隊列運行執行的最大線程數-已經執行的數量 * 當全部的線程都在被執行時,隊列中等待數量爲0,可是還未執行完的話,直接用0來判斷 */ var taskList = new List<System.Threading.Tasks.Task>(); for (int i = 0; i < wantExecuteTaskCount; i++) { var task = queue.Execute(() => { Thread.Sleep(2000); Debug.WriteLine("已執行的數量" + executedOrExecutingCount); Interlocked.Increment(ref executedOrExecutingCount); }); taskList.Add(task); } // ReSharper disable once LoopVariableIsNeverChangedInsideLoop while (executedOrExecutingCount < wantExecuteTaskCount) { Thread.Sleep(700); var queueWaitCount = queue.WaitingTaskCount; var wantWaitCount = wantExecuteTaskCount - maxCanExecuteTaskCount - executedOrExecutingCount; Debug.WriteLine("************"); Debug.WriteLine("隊列等待執行的數量" + queueWaitCount); Debug.WriteLine("隊列指望等待執行的數量" + wantWaitCount); Debug.WriteLine("************"); if (wantWaitCount < 0) Assert.AreEqual(queueWaitCount, 0); else Assert.AreEqual(queueWaitCount, wantWaitCount); } } [TestMethod] public void WaitTest() { //System.Threading.Tasks.Task.Factory.StartNew(() => //{ // try // { // var str = HttpUtil.Get<string>("http://172.16.2.3:10004", // "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1"); // } // catch (Exception e) // { // Console.WriteLine(e); // throw; // } //}); var wantExecuteTaskCount = 10; var maxCanExcuteTaskCount = 1; var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount); var addList = new List<int>(); var executedList = new List<int>(); for (int i = 0; i < wantExecuteTaskCount; i++) { addList.Add(i); } var tasks = new List<System.Threading.Tasks.Task>(); /* * 測試原理: * 執行異步請求看最後是否成功執行 */ addList.ForEach(index => { var list = executedList; var task = queue.Execute(() => { try { var str = HttpUtil.Get<string>("http://172.16.2.3:10004", "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1"); } catch (Exception e) { Console.WriteLine(e); } var item = index / maxCanExcuteTaskCount; list.Add(item); }); tasks.Add(task); }); System.Threading.Tasks.Task.WaitAll(tasks.ToArray()); //System.Threading.Tasks.Task.WaitAll(tasks.ToArray()); addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList(); executedList = executedList.Distinct().ToList(); for (int i = 0; i < addList.Count; i++) { Assert.AreEqual(addList[i], executedList[i]); } } [TestMethod] public void ExceptionTestTest() { var wantExecuteTaskCount = 10; var maxCanExecuteTaskCount = 2; var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount); var executedOrExecutingCount = 0; /* * 發現部分狀況下隊列裏面的異常檢測不到 * 這裏作了測試 */ queue.Execute(() => { try { throw new Exception("測試異常"); } catch (Exception ex) { Assert.IsNotNull(ex); } }); } [TestMethod] public void GetCurrentTaskIndexTeset() { var wantExecuteTaskCount = 10; var maxCount = 10; var waitSeconds = 1000; var queue = TaskQueue.GetQueue(maxCount); List< System.Threading.Tasks.Task> executeingTasks = new List<System.Threading.Tasks.Task>(); /* * 測試方法: * 假設併發爲10,那麼首先塞進去10個線程執行, * 那麼後面的線程都在等待, * 在放進去n個,則後面的正在排隊,返回的位置和放進去的位置一致 * 而以前放進去的10個都在執行,返回-1 */ for (int i = 0; i < maxCount; i++) { var task= queue.Execute(() => { try { Thread.Sleep(waitSeconds * 1000); } catch (Exception e) { Console.WriteLine(e); } }); executeingTasks.Add(task); } Dictionary<int,System.Threading.Tasks.Task> tasks = new Dictionary<int, System.Threading.Tasks.Task>(); for (int i = 0; i < wantExecuteTaskCount; i++) { var task = queue.Execute(() => { try { Thread.Sleep(waitSeconds*1000); } catch (Exception e) { Console.WriteLine(e); } }); tasks.Add(i,task); } foreach (var task in executeingTasks) { Assert.AreEqual(queue.GetCurrentTaskIndex(task.Id), -1); } foreach (var task in tasks) { Assert.AreEqual(queue.GetCurrentTaskIndex(task.Value.Id),task.Key); } } } }
具體的代碼中都有每一個測試用例的設計思考。
注意事項
注意事項就是我在這裏踩過的坑
Session丟失
在web應用中session的存儲實際以線程爲基礎,相似ThreadLocal實現的線程隔離和靜態使用。
這裏再執行以前須要預先取出來要使用的對象而後再在action/func中使用
DbContext
1. 在ef中若是獲取一個dbcontext在另一個線程中保存就會報錯。
2. 若是一個從ef取出來的對象從一個線程傳遞到另一個線程,修改提交就會報錯。
3. 若是主線程和隊列中線程同時操做例如一個讀一個寫,或者同時寫,此時也會報錯
Ioc的問題
若是ioc採用prethread或者preresolve這兩種方式來管理生命週期,理論上在http請求結束的時候都要對線程中使用的對象進行釋放,
由於隊列爲異步那麼當請求結束時候線程卻還在執行,此時就會出現空指針的問題,或者ef中對象已經釋放。
最佳實踐
/// <summary> /// 調用示例 /// /// </summary> /// <typeparam name="TService"></typeparam> public static void AsyncExecuteServiceAction<TService>(this TService service,string actionName,params object[] parameters) where TService:CoralService { TaskQueue.Instance.Execute(() => { using (var newservice = UnityService.Resolve<TService>()) { newservice.InitContext(service.AppContext,service.UserContext,service.SessionContext,service.PageContext); newservice.ExecuteMethod<object>(actionName, parameters); } }); }
1. service.AppContext,service.UserContext,service.SessionContext,service.PageContext 這是四種不一樣級別的緩存,理解爲session
2. using 的目的就是爲了釋放對象
3. 反射參數的方式只是爲了方便調用
總結
1. 代碼不難,直接拷貝就可使用,單元測試也是同樣
2. TPL我以爲是在.net技術中很好用的一部分,須要熟悉
3. 線程中有不少對象,鎖等問題,這個要隨着經驗不斷的挖坑填坑來增加經驗
展望
1. 前面說過兩種線程計劃,我這裏放入prethread是在某個工具中有壓力測試的部分,須要以最快的速度來建立線程,因此沒有采用複用的方式,具體後面有一個接口測試工具的系列文章來介紹。2. 這裏說的線程隊列,和前面動態類型序列化是後面一個做業調度系統的基礎,這部分還在設計和實現中,預計月底能夠在文章中和你們見面。3. 其實上一篇和這一篇都是在爲做業系統作鋪墊。4. 下一篇將介紹一個定時執行任務的設計。5. 雖然我一直以爲設計是最重要的,實現其次,可是要落地仍是要依賴於實現,因此這些基本的組件和幫助類仍是須要的。