線程隊列

在web應用中,單個進程或者機器的響應速度有限,相似大量數據導入導出的操做的數量若是不加限制,會致使服務器cpu被吃滿,致使其餘一些很簡單的請求沒法及時響應的問題。針對這個限制提出了以下要求。
1. 先到達的請求先執行: 先入先出原則
2. 只能同時執行若干請求:避免cpu被吃滿
3. 異步執行:若是長時間執行會長期佔用iis的工做線程web

基於上述的要求我設計了一個隊列。這個隊列咱們須要稍微提一個組件,ParallelExtensionsExtrasapi

這是微軟提供的一個線程的擴展,具體的自行搜索下相關資料,這裏開始的時候我並無用這個組件,而是本身對task的封裝,可是實際上task仍是利用的線程池,線程池默認的線程數10個,並不能知足某些場景對多個現成的要求,因而在漫長的搜索過程當中才發現了這個組建。緩存

這裏我主要用到兩個對TaskScheduler的擴展安全

QueuedTaskScheduler:對task進行排隊執行,執行時在Thread環境中,而且能夠控制線程的數量,相似自定義的線程池。服務器

ThreadPerTaskScheduler: 顧名思義就是在線程中執行每一個task。session

針對兩種區別在於,QueuedTaskScheduler 的線程是可複用的,在線程數量固定的狀況下推薦使用。多線程

ThreadPerTaskScheduler 只建立線程而不進行銷燬,每次執行一個task都是使用一個new一個thread執行。適用於在某些線程數量動態變化的狀況。併發

下面是實現代碼:app

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;
using Coralcode.Framework.Data;
using Coralcode.Framework.Extensions;
using ThreadState = System.Threading.ThreadState;

namespace Coralcode.Framework.Task
{
    /// <summary>
    /// 線程隊列
    /// 注意若是使用ScheduleType.Defaut 存在線程池數量限制
    /// </summary>
    public class TaskQueue
    {
        public static readonly TaskQueue Instance = new TaskQueue(1, ScheduleType.Thread);

        /// <summary>
        /// 獲取隊列
        /// </summary>
        /// <param name="maxCount">最大併發數</param>
        /// <param name="type">執行計劃</param>
        /// <returns></returns>
        public static TaskQueue GetQueue(int maxCount,ScheduleType type=ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, type);
        }

        /// <summary>
        /// 獲取隊列
        /// </summary>
        /// <param name="maxCount">最大併發數</param>
        /// <param name="name">隊列名稱</param>
        /// <param name="type">執行計劃</param>
        /// <returns></returns>
        public static TaskQueue GetQueue(int maxCount, string name, ScheduleType type = ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, name, type);
        }

        /// <summary>
        /// 獲取隊列
        /// </summary>
        /// <param name="maxCount">最大併發數</param>
        /// <param name="name">隊列名稱</param>
        /// <param name="token">取消令牌</param>
        /// <param name="type">執行計劃</param>
        public static TaskQueue GetQueue(int maxCount, string name, CancellationToken token, ScheduleType type = ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, name, token, type);
        }

        /// <summary>
        /// 獲取隊列
        /// </summary>
        /// <param name="maxCount">最大併發數</param>
        /// <param name="token">取消令牌</param>
        /// <param name="type">執行計劃</param>
        public static TaskQueue GetQueue(int maxCount, CancellationToken token, ScheduleType type = ScheduleType.Thread)
        {
            return new TaskQueue(maxCount, IdentityGenerator.NewGuidString(), token, type);
        }

        private ConcurrentQueue<System.Threading.Tasks.Task> _tasks;
        private readonly int _limitedTaskCount;
        private int _runningTaskCount;
        private Thread _mainExcuteThread;
        private CancellationToken _token;
        private TaskScheduler _scheduler;

        private TaskQueue(int maxCount, ScheduleType type) :
            this(maxCount, IdentityGenerator.NewGuidString(), type)
        {
        }

        private TaskQueue(int maxCount, string name, ScheduleType type) :
            this(maxCount, name, CancellationToken.None, type)

        {
        }

        private TaskQueue(int maxCount, string name, CancellationToken token, ScheduleType type)
        {
            _limitedTaskCount = maxCount;
            _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>();
            Name = name;
            _token = token;
            switch (type)
            {
                case ScheduleType.Default:
                    _scheduler = new QueuedTaskScheduler(maxCount);
                    break;
                case ScheduleType.Thread:
                    _scheduler = new ThreadPerTaskScheduler();
                    break;
                default:
                    throw new ArgumentOutOfRangeException(nameof(type), type, null);
            }

        }

        /// <summary>
        /// 執行不帶返回結果的方法
        /// </summary>
        /// <param name="func"></param>
        /// <returns></returns>
        public System.Threading.Tasks.Task Execute(Action func)
        {
            var task = new System.Threading.Tasks.Task(func, _token);
            _tasks.Enqueue(task);
            if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped))
            {
                _mainExcuteThread?.DisableComObjectEagerCleanup();
                _mainExcuteThread = new Thread(NotifyThreadPendingWork);
                _mainExcuteThread.Start();
            }
            return task;
        }

        /// <summary>
        /// 執行帶返回結果的方法
        /// </summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="func"></param>
        /// <returns></returns>

        [MethodImpl(MethodImplOptions.Synchronized)]
        public Task<TResult> Execute<TResult>(Func<TResult> func)
        {
            var task = new Task<TResult>(func, _token);
            _tasks.Enqueue(task);
            if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped))
            {
                _mainExcuteThread?.DisableComObjectEagerCleanup();
                _mainExcuteThread = new Thread(NotifyThreadPendingWork);
                _mainExcuteThread.Priority = ThreadPriority.Highest;
                _mainExcuteThread.Start();
            }
            return task;
        }

        private void NotifyThreadPendingWork()
        {
            try
            {
                while (true)
                {
                    if (_token.IsCancellationRequested)
                    {
                        _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>();
                        break;
                    }

                    System.Threading.Tasks.Task task;
                    if (!_tasks.TryDequeue(out task))
                        break;

                    task.Start(_scheduler);
                    Interlocked.Increment(ref _runningTaskCount);
                    task.ContinueWith(item =>
                    {
                        Interlocked.Decrement(ref _runningTaskCount);
                    });
                    //Debug.WriteLine("隊列{0},容許執行 {1} 條,等待線程爲 {2} ,執行中 {3} 條,時間爲 {4} ", _name, _limitedTaskCount, _tasks.Count, _runningTaskCount, DateTime.Now);
                    while (_runningTaskCount >= _limitedTaskCount)
                    {
                        Thread.Sleep(500);
                    }
                }
            }
            finally
            {
                _runningTaskCount = 0;
            }
        }

        /// <summary>
        /// 線程隊列的名字
        /// </summary>
        public string Name { get; }

        /// <summary>
        /// 根據返回的task的id獲取到當前task排隊的位置
        /// </summary>
        /// <param name="taskId"></param>
        /// <returns>返回-1表示正在執行,或者task沒有加進去,返回大於等於0則表示其順序</returns>
        public int GetCurrentTaskIndex(int taskId)
        {
            lock (_tasks)
            {
                return _tasks.IndexOf(item => item.Id == taskId);
            }
        }

        /// <summary>
        /// 等待執行的線程數量
        /// </summary>
        public int WaitingTaskCount => _tasks.Count;
        /// <summary>
        /// 正在執行的線程數量
        /// </summary>
        public int RunningTaskCount => _runningTaskCount;
        /// <summary>
        /// 併發數
        /// </summary>
        public int LimitedTaskCount => _limitedTaskCount;
    }

    /// <summary>
    /// 
    /// </summary>
    public enum ScheduleType {
        /// <summary>
        /// 線程池,默認方式
        /// </summary>
        Default,
        /// <summary>
        /// 自定義線程,ThreadPerTaskScheduler
        /// </summary>
        Thread
    }
    
}

  

重點部分說明
1. ConcurrentQueue 自己在處理多線程環境因此採用線程安全的隊列。
2. ContinueWith 在任務執行完畢以後須要對執行的數量-1。
3. NotifyThreadPendingWork 這裏就是啓動另一個主線程來對任務進行分發。
4. 主線程分發來知足異步的需求。
5. CancellationToken 提供取消執行的功能。
6. 能夠採用默認也能夠本身實例化,而默認是一個線程,即考慮經常使用狀況,也提供擴展的功能。異步

測試代碼


寫出來這個隊列的部分可能只用了1小時,可是寫測試代碼和調試用了差很少半天時間才搞定。

線程的測試在單元測試中一直是難以控制的,在這個case中多個線程併發的狀況下,實時獲取排隊數量,執行中數量也是個很難測試的部分。

 

下面是測試代碼:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Coralcode.Framework.Task;
using Coralcode.Framework.Utils;
using iTextSharp.text;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace FrameworkTest.Task
{
    [TestClass]
    public class TaskQueueTest
    {
        [TestMethod]
        public void TwoTimesExcuteWithFreeBetweenTwoTimsTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExcuteTaskCount = 1;
            var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
            var addList = new List<int>();
            var executedList = new List<int>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }
            /*
             * 測試原理:
             * 添加一批任務,等待執行完成之後在執行一次
             * 第二次也能執行完畢,無報錯的話認爲,
             * 隊列在執行完畢後自動暫停,等有新任務進來的時候能夠從新啓動
             */
            addList.ForEach(index =>
            {
                var list = executedList;
                queue.Execute(() =>
                {
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                    return true;
                });
            });
            while (addList.Count != executedList.Count)
            {
                Thread.Sleep(1000);

            }
            Assert.AreEqual(queue.WaitingTaskCount, 0);
            executedList.Clear();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }
            addList.ForEach(index =>
            {
                var list = executedList;
                queue.Execute(() =>
                {
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                    return true;
                });
            });
            while (addList.Count != executedList.Count)
            {
                Thread.Sleep(1000);
            }
            Assert.AreEqual(queue.WaitingTaskCount, 0);
        }




        [TestMethod]
        public void ExecuteAsAddSequenceTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExcuteTaskCount = 1;
            var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
            var addList = new List<int>();
            var executedList = new List<int>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }


            /*
             * 測試原理:
             * 按需添加,而後記錄執行,結果也必須是順序的
             * 若是一次執行多條,那麼多條被認爲同一個批次
             * 同一個批次的順序應該是一致的
             */
            addList.ForEach(index =>
            {
                var list = executedList;
                queue.Execute(() =>
                {
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                    return true;
                });
            });

            while (addList.Count != executedList.Count)
            {
                Thread.Sleep(1000);

            }
            addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList();
            executedList = executedList.Distinct().ToList();
            for (int i = 0; i < addList.Count; i++)
            {
                Assert.AreEqual(addList[i], executedList[i]);
            }
        }



        [TestMethod]
        public void WaitCountTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExecuteTaskCount = 2;
            var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount);
            var executedOrExecutingCount = 0;

            /*
             * 測試原理:
             * 記錄執行了的數量
             * 隊列中等待的數量 =最開始放入的數量-隊列運行執行的最大線程數-已經執行的數量
             * 當全部的線程都在被執行時,隊列中等待數量爲0,可是還未執行完的話,直接用0來判斷
             */
            var taskList = new List<System.Threading.Tasks.Task>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                var task = queue.Execute(() =>
                {
                    Thread.Sleep(2000);
                    Debug.WriteLine("已執行的數量" + executedOrExecutingCount);
                    Interlocked.Increment(ref executedOrExecutingCount);
                });
                taskList.Add(task);
            }
            // ReSharper disable once LoopVariableIsNeverChangedInsideLoop
            while (executedOrExecutingCount < wantExecuteTaskCount)
            {
                Thread.Sleep(700);
                var queueWaitCount = queue.WaitingTaskCount;
                var wantWaitCount = wantExecuteTaskCount - maxCanExecuteTaskCount - executedOrExecutingCount;
                Debug.WriteLine("************");
                Debug.WriteLine("隊列等待執行的數量" + queueWaitCount);
                Debug.WriteLine("隊列指望等待執行的數量" + wantWaitCount);
                Debug.WriteLine("************");
                if (wantWaitCount < 0)
                    Assert.AreEqual(queueWaitCount, 0);
                else
                    Assert.AreEqual(queueWaitCount, wantWaitCount);
            }
        }

        [TestMethod]
        public void WaitTest()
        {

            //System.Threading.Tasks.Task.Factory.StartNew(() =>
            //{
            //    try
            //    {
            //        var str = HttpUtil.Get<string>("http://172.16.2.3:10004",
            //        "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1");
            //    }
            //    catch (Exception e)
            //    {
            //        Console.WriteLine(e);
            //        throw;
            //    }

            //});
            var wantExecuteTaskCount = 10;
            var maxCanExcuteTaskCount = 1;
            var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
            var addList = new List<int>();
            var executedList = new List<int>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                addList.Add(i);
            }

            var tasks = new List<System.Threading.Tasks.Task>();
            /*
             * 測試原理:
             * 執行異步請求看最後是否成功執行
             */
            addList.ForEach(index =>
            {
                var list = executedList;
                var task = queue.Execute(() =>
                {
                    try
                    {
                        var str = HttpUtil.Get<string>("http://172.16.2.3:10004",
                            "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                    var item = index / maxCanExcuteTaskCount;
                    list.Add(item);
                });
                tasks.Add(task);
            });
            System.Threading.Tasks.Task.WaitAll(tasks.ToArray());

            //System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
            addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList();
            executedList = executedList.Distinct().ToList();
            for (int i = 0; i < addList.Count; i++)
            {
                Assert.AreEqual(addList[i], executedList[i]);
            }
        }

        [TestMethod]
        public void ExceptionTestTest()
        {
            var wantExecuteTaskCount = 10;
            var maxCanExecuteTaskCount = 2;
            var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount);
            var executedOrExecutingCount = 0;

            /*
             * 發現部分狀況下隊列裏面的異常檢測不到
             * 這裏作了測試
             */

            queue.Execute(() =>
            {
                try
                {
                    throw new Exception("測試異常");
                }
                catch (Exception ex)
                {
                    Assert.IsNotNull(ex);
                }
            });

        }


        [TestMethod]
        public void GetCurrentTaskIndexTeset()
        {
            var wantExecuteTaskCount = 10;
            var maxCount = 10;
            var waitSeconds = 1000;
            var queue = TaskQueue.GetQueue(maxCount);

            List< System.Threading.Tasks.Task> executeingTasks = new List<System.Threading.Tasks.Task>();
            /*
             * 測試方法:
             * 假設併發爲10,那麼首先塞進去10個線程執行,
             * 那麼後面的線程都在等待,
             * 在放進去n個,則後面的正在排隊,返回的位置和放進去的位置一致
             * 而以前放進去的10個都在執行,返回-1
             */
            for (int i = 0; i < maxCount; i++)
            {
               var task= queue.Execute(() =>
                {
                    try
                    {
                        Thread.Sleep(waitSeconds * 1000);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                });
                executeingTasks.Add(task);
            }
            Dictionary<int,System.Threading.Tasks.Task> tasks = new Dictionary<int, System.Threading.Tasks.Task>();
            for (int i = 0; i < wantExecuteTaskCount; i++)
            {
                var task = queue.Execute(() =>
                {
                    try
                    {
                        Thread.Sleep(waitSeconds*1000);
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                    }
                });
                tasks.Add(i,task);
            }
            foreach (var task in executeingTasks)
            {
                Assert.AreEqual(queue.GetCurrentTaskIndex(task.Id), -1);
            }
            foreach (var task in tasks)
            {
                Assert.AreEqual(queue.GetCurrentTaskIndex(task.Value.Id),task.Key);
            }

        }

    }
}

  


具體的代碼中都有每一個測試用例的設計思考。

 


注意事項

注意事項就是我在這裏踩過的坑

Session丟失


在web應用中session的存儲實際以線程爲基礎,相似ThreadLocal實現的線程隔離和靜態使用。
這裏再執行以前須要預先取出來要使用的對象而後再在action/func中使用

DbContext


1. 在ef中若是獲取一個dbcontext在另一個線程中保存就會報錯。
2. 若是一個從ef取出來的對象從一個線程傳遞到另一個線程,修改提交就會報錯。
3. 若是主線程和隊列中線程同時操做例如一個讀一個寫,或者同時寫,此時也會報錯


Ioc的問題

 

若是ioc採用prethread或者preresolve這兩種方式來管理生命週期,理論上在http請求結束的時候都要對線程中使用的對象進行釋放,

由於隊列爲異步那麼當請求結束時候線程卻還在執行,此時就會出現空指針的問題,或者ef中對象已經釋放。


最佳實踐

        /// <summary>
        /// 調用示例
        /// 
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        public  static void AsyncExecuteServiceAction<TService>(this TService service,string actionName,params  object[] parameters)
            where TService:CoralService
        {
            TaskQueue.Instance.Execute(() =>
            {
                using (var newservice = UnityService.Resolve<TService>())
                {
                    newservice.InitContext(service.AppContext,service.UserContext,service.SessionContext,service.PageContext);
                    newservice.ExecuteMethod<object>(actionName, parameters);
                }
            });
        }

  

1. service.AppContext,service.UserContext,service.SessionContext,service.PageContext 這是四種不一樣級別的緩存,理解爲session
2. using 的目的就是爲了釋放對象
3. 反射參數的方式只是爲了方便調用

 


總結
1. 代碼不難,直接拷貝就可使用,單元測試也是同樣
2. TPL我以爲是在.net技術中很好用的一部分,須要熟悉
3. 線程中有不少對象,鎖等問題,這個要隨着經驗不斷的挖坑填坑來增加經驗


展望

1. 前面說過兩種線程計劃,我這裏放入prethread是在某個工具中有壓力測試的部分,須要以最快的速度來建立線程,因此沒有采用複用的方式,具體後面有一個接口測試工具的系列文章來介紹。2. 這裏說的線程隊列,和前面動態類型序列化是後面一個做業調度系統的基礎,這部分還在設計和實現中,預計月底能夠在文章中和你們見面。3. 其實上一篇和這一篇都是在爲做業系統作鋪墊。4. 下一篇將介紹一個定時執行任務的設計。5. 雖然我一直以爲設計是最重要的,實現其次,可是要落地仍是要依賴於實現,因此這些基本的組件和幫助類仍是須要的。

相關文章
相關標籤/搜索